Source code for covidor.kmers

#
#  This file is part of Covidor software
#
#  Copyright (c) 2022 - Sequana Dev Team (https://covidor.readthedocs.io)
#
#  Distributed under the terms of the 3-clause BSD license.
#  The full license is in the LICENSE file, distributed with this software.
#
#  Website:       https://github.com/sequana/covidor
#  Documentation: http://sequana.readthedocs.io
#  Contributors:  https://github.com/sequana/covidor/graphs/contributors
##############################################################################
import glob
import random

from pathlib import Path

import pandas as pd
from covidor.fasta import FastA

__all__ = ["KmerOverlap"]


[docs]class KmerOverlap: """ from covidor import KmerOverlap ko = KmerOverlap() for fasta in fastas: ko.enumerate_unique_kmers(fasta) names = self.kmer_dict.keys() ko.get_list_specific_kmers(name) """ def __init__(self, kmer_len=35): self.kmer_len = kmer_len self.kmer_dict = {} def _get_kmers(self, sequence): """Return all unique kmers found in a sequence""" kmer_set = set() for start, end in zip(range(0, len(sequence) - self.kmer_len - 1), range(self.kmer_len,len(sequence) + 1)): kmer_set.add(sequence[start:end]) return kmer_set
[docs] def enumerate_unique_kmers(self, fasta_files): """populate kmers for each fasta""" for fas in fasta_files: fas = FastA(fas) if len(fas) != 1: #pragma: no cover raise IOError("Fasta file should contain a single sequence") sequence = fas.sequences[0] kmers = self._get_kmers(sequence) self.kmer_dict[fas.filename] = kmers
[docs] def get_list_specific_kmers(self, name): """Returns kmer specific (unique) to a fasta""" kmers = self.kmer_dict[name] other_kmers = set.union(*[self.kmer_dict[x] for x in self.kmer_dict if x!=name]) specific_kmers = [x for x in kmers if x not in other_kmers] return specific_kmers
[docs] def get_specific_kmers(self): """Return dataframe with number of unique kmers in each fasta""" res_dict = {} for k in self.kmer_dict: #TODO reuse the specific kmer sel_kmers = self.kmer_dict[k] other_kmers = [self.kmer_dict[x] for x in self.kmer_dict if x != k] other_kmers = set.union(*other_kmers) res_dict[k] = len(sel_kmers - other_kmers) return pd.DataFrame(res_dict, index=["n_unique_kmers"]).T
[docs] def get_proportion_kmer_specific(self, name): """ Gives same results as get_specific_kmers but normalised and for a given name""" S = 0 seq = FastA(name).sequences[0] kmers = self.get_list_specific_kmers(name) for i in range(len(seq) - self.kmer_len): if seq[i:i+self.kmer_len] in kmers: S += 1 return S / len(seq) * 100
[docs] def get_proportion_reads_specific(self, name, N=100, read_length=75, paired=False): """ import glob fastas = glob.glob("databases/genomes/*fasta") kmer_dict = get_all_kmers() expected_percentage = get_proportion_reads_specific(kmer_dict, "NAME_ONE_FASTA") """ seq = FastA(name).sequences[0] kmers_positions = [] kmers = self.get_list_specific_kmers(name) for i in range(len(seq)-35): if seq[i:i+35] in kmers: kmers_positions.append(i) overlap = 0 reads_positions = [random.randint(0, len(seq)) for x in range(N)] count = 0 for pos in reads_positions: pos1 = pos pos2 = pos1 + read_length for kmer_pos in kmers_positions: k1 = kmer_pos k2 = kmer_pos + self.kmer_len if k1 >=pos1 and k2<=pos2: # we found a kmer in the read overlap += 1 break if paired: k1 = kmer_pos k2 = kmer_pos + self.kmer_len if k1 >=pos1+100 and k2<=pos2+100: # we found a kmer in the read overlap += 1 break count+=1 if count % 10000 == 0: #pragma no cover print(f"{count}/{N}") return overlap / N * 100
# MonteCarlo """ expected = { '100_pe': [3.3, 16.0,10.84, 18.45, 17.5, 20.43], '100_se': [1.68,8.73,5.64,9.37,9.55,11.88], '75_se': [1.22,6.6,4.25,7.2,7.5,9.1], '75_pe': [2.4,12.52,8.33,13.89,13.93,16.25] '150_se': [2.22,12.46,8.45,14.51,13.9,16.45] } for length in [75, 100]: for paired in ['se', 'pe']: print(length, paired) exp = [] for x in fastas: overlap = get_proportion_reads_specific(kmer_dict, x, N=100000, read_length=length, paired=paired=='pe') exp.append(overlap) expected[f'{length}_{paired}'] = exp fastas = sorted(glob.glob("covid/*")) kmer_dict = get_all_kmers(fastas) #resultats de simulation avec 100,000 reads (a multiplier par 6 donc) MiSeq res['100_se'] = [x*6 for x in [0.26,1.30, 0.88,1.43,1.50,1.89]] res['100_pe'] = [x*6 for x in [0.51,2.4,1.67,2.70,2.76,3.24]] # analyse 6Mreads en res['75_se'] = [x*6 for x in [0.19, 0.98,0.65,1.07,1.14,1.45]] res['75_pe'] = [x*6 for x in [0.38, 1.82, 1.27, 2.1, 2.14, 2.59]] # perform a simulation: #art_illumina -ss MSv1 -i covid/40000000_MZ344997.fasta -l 100 -c 1000000 -p -m 200 -s 10 -o /tmp/tmpk088f7am/ART_paired_ -na #kraken2 art/art_mixed_MSv1_100_1.fq --db covidor2/ --out kraken.out --report test ; more test """