#
# This file is part of Covidor software
#
# Copyright (c) 2022 - Sequana Dev Team (https://covidor.readthedocs.io)
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# Website: https://github.com/sequana/covidor
# Documentation: http://sequana.readthedocs.io
# Contributors: https://github.com/sequana/covidor/graphs/contributors
##############################################################################
import glob
import random
from pathlib import Path
import pandas as pd
from covidor.fasta import FastA
__all__ = ["KmerOverlap"]
[docs]class KmerOverlap:
"""
from covidor import KmerOverlap
ko = KmerOverlap()
for fasta in fastas:
ko.enumerate_unique_kmers(fasta)
names = self.kmer_dict.keys()
ko.get_list_specific_kmers(name)
"""
def __init__(self, kmer_len=35):
self.kmer_len = kmer_len
self.kmer_dict = {}
def _get_kmers(self, sequence):
"""Return all unique kmers found in a sequence"""
kmer_set = set()
for start, end in zip(range(0, len(sequence) - self.kmer_len - 1),
range(self.kmer_len,len(sequence) + 1)):
kmer_set.add(sequence[start:end])
return kmer_set
[docs] def enumerate_unique_kmers(self, fasta_files):
"""populate kmers for each fasta"""
for fas in fasta_files:
fas = FastA(fas)
if len(fas) != 1: #pragma: no cover
raise IOError("Fasta file should contain a single sequence")
sequence = fas.sequences[0]
kmers = self._get_kmers(sequence)
self.kmer_dict[fas.filename] = kmers
[docs] def get_list_specific_kmers(self, name):
"""Returns kmer specific (unique) to a fasta"""
kmers = self.kmer_dict[name]
other_kmers = set.union(*[self.kmer_dict[x] for x in self.kmer_dict if x!=name])
specific_kmers = [x for x in kmers if x not in other_kmers]
return specific_kmers
[docs] def get_specific_kmers(self):
"""Return dataframe with number of unique kmers in each fasta"""
res_dict = {}
for k in self.kmer_dict:
#TODO reuse the specific kmer
sel_kmers = self.kmer_dict[k]
other_kmers = [self.kmer_dict[x] for x in self.kmer_dict
if x != k]
other_kmers = set.union(*other_kmers)
res_dict[k] = len(sel_kmers - other_kmers)
return pd.DataFrame(res_dict, index=["n_unique_kmers"]).T
[docs] def get_proportion_kmer_specific(self, name):
""" Gives same results as get_specific_kmers but normalised and for a given name"""
S = 0
seq = FastA(name).sequences[0]
kmers = self.get_list_specific_kmers(name)
for i in range(len(seq) - self.kmer_len):
if seq[i:i+self.kmer_len] in kmers:
S += 1
return S / len(seq) * 100
[docs] def get_proportion_reads_specific(self, name, N=100, read_length=75, paired=False):
"""
import glob
fastas = glob.glob("databases/genomes/*fasta")
kmer_dict = get_all_kmers()
expected_percentage = get_proportion_reads_specific(kmer_dict, "NAME_ONE_FASTA")
"""
seq = FastA(name).sequences[0]
kmers_positions = []
kmers = self.get_list_specific_kmers(name)
for i in range(len(seq)-35):
if seq[i:i+35] in kmers:
kmers_positions.append(i)
overlap = 0
reads_positions = [random.randint(0, len(seq)) for x in range(N)]
count = 0
for pos in reads_positions:
pos1 = pos
pos2 = pos1 + read_length
for kmer_pos in kmers_positions:
k1 = kmer_pos
k2 = kmer_pos + self.kmer_len
if k1 >=pos1 and k2<=pos2: # we found a kmer in the read
overlap += 1
break
if paired:
k1 = kmer_pos
k2 = kmer_pos + self.kmer_len
if k1 >=pos1+100 and k2<=pos2+100: # we found a kmer in the read
overlap += 1
break
count+=1
if count % 10000 == 0: #pragma no cover
print(f"{count}/{N}")
return overlap / N * 100
# MonteCarlo
"""
expected = {
'100_pe': [3.3, 16.0,10.84, 18.45, 17.5, 20.43],
'100_se': [1.68,8.73,5.64,9.37,9.55,11.88],
'75_se': [1.22,6.6,4.25,7.2,7.5,9.1],
'75_pe': [2.4,12.52,8.33,13.89,13.93,16.25]
'150_se': [2.22,12.46,8.45,14.51,13.9,16.45]
}
for length in [75, 100]:
for paired in ['se', 'pe']:
print(length, paired)
exp = []
for x in fastas:
overlap = get_proportion_reads_specific(kmer_dict, x, N=100000, read_length=length, paired=paired=='pe')
exp.append(overlap)
expected[f'{length}_{paired}'] = exp
fastas = sorted(glob.glob("covid/*"))
kmer_dict = get_all_kmers(fastas)
#resultats de simulation avec 100,000 reads (a multiplier par 6 donc) MiSeq
res['100_se'] = [x*6 for x in [0.26,1.30, 0.88,1.43,1.50,1.89]]
res['100_pe'] = [x*6 for x in [0.51,2.4,1.67,2.70,2.76,3.24]] # analyse 6Mreads en
res['75_se'] = [x*6 for x in [0.19, 0.98,0.65,1.07,1.14,1.45]]
res['75_pe'] = [x*6 for x in [0.38, 1.82, 1.27, 2.1, 2.14, 2.59]]
# perform a simulation:
#art_illumina -ss MSv1 -i covid/40000000_MZ344997.fasta -l 100 -c 1000000 -p -m 200 -s 10 -o /tmp/tmpk088f7am/ART_paired_ -na
#kraken2 art/art_mixed_MSv1_100_1.fq --db covidor2/ --out kraken.out --report test ; more test
"""