# This file is part of Sequana software
#
# Copyright (c) 2016-2021 - Sequana Development Team
#
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
import os
import shutil
import sys
from pathlib import Path, PosixPath
import colorlog
from colormap import Colormap
from easydev import TempFile, md5
from snakemake import shell
from sequana import sequana_config_path
from sequana.lazy import numpy as np
from sequana.lazy import pandas as pd
from sequana.lazy import pylab
from sequana.misc import wget
logger = colorlog.getLogger(__name__)
__all__ = [
"KrakenResults",
"KrakenPipeline",
"KrakenAnalysis",
"KrakenSequential",
"KrakenDB",
]
[docs]class KrakenDB:
"""Class to handle a kraken DB"""
def __init__(self, filename):
if isinstance(filename, KrakenDB):
filename = filename.path
if os.path.exists(filename) is False:
possible_path = sequana_config_path + "/kraken2_dbs/" + filename
if os.path.exists(possible_path) is True:
self.path = possible_path
else:
msg = f"{filename} not found locally or in {sequana_config_path}."
raise IOError(msg)
else:
self.path = os.path.abspath(filename)
self.name = os.path.basename(self.path)
def _get_database_version(self):
if os.path.exists(self.path + os.sep + "hash.k2d"):
return "kraken2"
else: # pragma: no cover
logger.error("Sequana supports kraken2 only. Looks like an invalid kraken database directory")
version = property(_get_database_version)
def __repr__(self):
return self.name
[docs]class KrakenResults(object):
"""Translate Kraken results into a Krona-compatible file
If you run a kraken analysis with :class:`KrakenAnalysis`, you will end up
with a file e.g. named kraken.out (by default).
You could use kraken-translate but then you need extra parsing to convert
into a Krona-compatible file. Here, we take the output from kraken and
directly transform it to a krona-compatible file.
kraken2 uses the --use-names that needs extra parsing.
::
k = KrakenResults("kraken.out")
k.kraken_to_krona()
Then format expected looks like::
C HISEQ:426:C5T65ACXX:5:2301:18719:16377 1 203 1:71 A:31 1:71
C HISEQ:426:C5T65ACXX:5:2301:21238:16397 1 202 1:71 A:31 1:71
Where each row corresponds to one read.
::
"562:13 561:4 A:31 0:1 562:3" would indicate that:
the first 13 k-mers mapped to taxonomy ID #562
the next 4 k-mers mapped to taxonomy ID #561
the next 31 k-mers contained an ambiguous nucleotide
the next k-mer was not in the database
the last 3 k-mers mapped to taxonomy ID #562
For kraken2, format is slighlty different since it depends on paired or not.
If paired, ::
C read1 2697049 151|151 2697049:117 |:| 0:1 2697049:116
See kraken documentation for details.
.. note:: a taxon of ID 1 (root) means that the read is classified but in
differen domain. https://github.com/DerrickWood/kraken/issues/100
.. note:: This takes care of fetching taxons and the corresponding lineages
from online web services.
"""
def __init__(self, filename="kraken.out", verbose=True):
""".. rubric:: **constructor**
:param filename: the input from KrakenAnalysis class
"""
self.filename = filename
on_rtd = os.environ.get("READTHEDOCS", None) == "True"
if on_rtd is False:
from sequana.taxonomy import Taxonomy
self.tax = Taxonomy(verbose=verbose)
self.tax.download_taxonomic_file() # make sure it is available locally
else: # pragma: no cover
class Taxonomy(object): # pragma: no cover
from sequana import sequana_data # must be local
df = pd.read_csv(sequana_data("test_taxon_rtd.csv"), index_col=0)
def get_lineage_and_rank(self, x):
# Note that we add the name as well here
ranks = [
"kingdom",
"phylum",
"class",
"order",
"family",
"genus",
"species",
"name",
]
return [(self.df.loc[x][rank], rank) for rank in ranks]
self.tax = Taxonomy()
if filename:
# This initialise the data
self._parse_data()
self._data_created = False
[docs] def get_taxonomy_db(self, ids):
"""Retrieve taxons given a list of taxons
:param list ids: list of taxons as strings or integers. Could also
be a single string or a single integer
:return: a dataframe
.. note:: the first call first loads all taxons in memory and takes a
few seconds but subsequent calls are much faster
"""
# filter the lineage to keep only information from one of the main rank
# that is superkingdom, kingdom, phylum, class, order, family, genus and
# species
ranks = ("kingdom", "phylum", "class", "order", "family", "genus", "species")
if isinstance(ids, int):
ids = [ids]
if len(ids) == 0:
return pd.DataFrame()
if isinstance(ids, list) is False:
ids = [ids]
lineage = [self.tax.get_lineage_and_rank(x) for x in ids]
# Now, we filter each lineage to keep only relevant ranks
# There are a few caveats though as explained hereafter
# we merge the kingdom and superkingdom and subkingdom
results = []
for i, this in enumerate(lineage):
default = dict.fromkeys(ranks, " ")
for entry in this:
if entry[1] in ranks:
default[entry[1]] = entry[0]
# if there is a superkingdom, overwrite the kingdom
for entry in this:
if entry[1] == "superkingdom":
default["kingdom"] = entry[0]
if default["kingdom"] == " ":
for entry in this:
if entry[1] == "subkingdom":
default["kingdom"] = entry[0]
# in theory, we have now populated all ranks;
# Yet, there are several special cases (need examples):
# 1. all ranks are filled: perfect
# 2. some ranks are empty: we fill them with a space.
# 3. all ranks are empty:
# a. this is the root
# b. this may be expected. e.g for an artifical sequence
# c. all ranks below species are empty --> this is probably
# what we will get e.g. for plasmids
# case 3.b
if set([x[1] for x in this]) == {"no rank", "species"}:
# we can ignore the root and keep the others
# if we end up with more than 6 entries, this is annoying
# let us put a warning for now.
count = 0
for x in this:
if x[1] == "no rank" and x[0] != "root":
default[ranks[count]] = x[0]
count += 1
if count > 6:
logger.warning("too many no_rank in taxon{}".format(ids[i]))
break
# for the name, we take the last entry, which is suppose to be the
# scientific name found, so the scientific name of the taxon itself.
# Note that this is not alwyas the species rank name
# For instance for the taxon 2509511, the ID correspond to
# a subgenus of Sarbecovirus and has no species entry.
last_name, last_rank = this[-1]
if last_rank not in ["species", "no rank"]:
default["name"] = f"{last_rank}:{last_name}"
else:
default["name"] = ""
results.append(default)
df = pd.DataFrame.from_records(results)
df.index = ids
df = df[list(ranks) + ["name"]]
df.index = df.index.astype(int)
return df
def _parse_data(self):
taxonomy = {}
logger.info("Reading kraken data from {}".format(self.filename))
columns = ["status", "taxon", "length"]
# we select only col 0,2,3 to save memory, which is required on very
# large files
try:
# each call to concat in the for loop below
# will take time and increase with chunk position.
# for 15M reads, this has a big cost. So chunksize set to 1M
# is better than 1000 and still reasonable in memory
reader = pd.read_csv(
self.filename,
sep="\t",
header=None,
usecols=[0, 2, 3],
chunksize=1000000,
)
except (pd.errors.EmptyDataError, FileNotFoundError): # pragma: no cover
logger.warning("Empty files. 100%% unclassified ?")
self.unclassified = 0 # size of the input data set
self.classified = 0
self._df = pd.DataFrame([], columns=columns)
self._taxons = self._df.taxon
return
except pd.errors.ParserError:
# raise NotImplementedError # this section is for the case
# #only_classified_output when there is no found classified read
raise NotImplementedError
for chunk in reader:
try:
self._df
self._df = pd.concat([self._df, chunk])
except AttributeError:
self._df = chunk
self._df.columns = columns
count = sum(self._df.taxon == 1)
percentage = count / len(self._df) * 100
if percentage >= 1:
logger.warning(
"Found {} taxons of classified reads with root ID (1) ({} %)".format(count, round(percentage, 2))
)
# This gives the list of taxons as index and their amount
# above, we select only columns 0, 2, 3 the column are still labelled
# 0, 2, 3 in the df
self._taxons = self._df.groupby("taxon").size()
try:
self._taxons.drop(0, inplace=True)
except:
pass # 0 may not be there
self._taxons.sort_values(ascending=False, inplace=True)
category = self.df.groupby("status").size()
if "C" in category.index:
self.classified = category["C"]
else:
self.classified = 0
if "U" in category.index:
self.unclassified = category["U"]
else:
self.unclassified = 0
logger.debug(self.taxons.iloc[0:10])
def _get_taxons(self):
try:
return self._taxons
except:
self._parse_data()
return self._taxons
taxons = property(_get_taxons)
def _get_df(self):
try:
return self._df
except:
self._parse_data()
return self._df
df = property(_get_df)
def _get_df_with_taxon(self, dbname):
df = self.get_taxonomy_db([int(x) for x in self.taxons.index])
df["count"] = self.taxons.values
df.reset_index(inplace=True)
newrow = len(df)
df.loc[newrow] = "Unclassified"
df.loc[newrow, "count"] = self.unclassified
df.loc[newrow, "index"] = -1
df.rename(columns={"index": "taxon"}, inplace=True)
try:
df["percentage"] = df["count"] / df["count"].sum() * 100
except ZeroDivisionError:
df["percentage"] = 0
starter = ["taxon", "count", "percentage"]
df = df[starter + [x for x in df.columns if x not in starter]]
df.sort_values(by="percentage", inplace=True, ascending=False)
return df
[docs] def kraken_to_csv(self, filename, dbname):
df = self._get_df_with_taxon(dbname)
df.to_csv(filename, index=False)
return df
[docs] def kraken_to_json(self, filename, dbname):
df = self._get_df_with_taxon(dbname)
try:
df.to_json(filename, indent=4, orient="records")
except:
df.to_json(filename, orient="records")
return df
[docs] def kraken_to_krona(self, output_filename=None, nofile=False):
"""
:return: status: True is everything went fine otherwise False
"""
if output_filename is None:
output_filename = str(self.filename) + ".summary"
taxon_to_find = list(self.taxons.index)
if len(taxon_to_find) == 0:
logger.warning("No reads were identified. You will need a more complete database")
self.output_filename = output_filename
with open(output_filename, "w") as fout:
fout.write("%s\t%s" % (self.unclassified, "Unclassified"))
return False
if len(taxon_to_find) == 0:
return False
df = self.get_taxonomy_db(taxon_to_find)
self.lineage = [";".join(this) for this in df[df.columns[0:-1]].values]
self.scnames = list(df["name"].values) # do we need a cast ?
# Now save the file
self.output_filename = output_filename
with open(output_filename, "w") as fout:
for i, this in enumerate(self.lineage):
taxon = taxon_to_find[i]
count = self.taxons.loc[taxon]
line = str(count) + "\t" + "\t".join(this.split(";"))
line += " " + self.scnames[i]
fout.write(line + "\n")
try:
fout.write("%s\t%s" % (self.unclassified, "Unclassified"))
except:
pass # unclassified may not exists if all classified
self._data_created = True
return True
[docs] def plot2(self, kind="pie", fontsize=12):
"""This is the simplified static krona-like plot included in HTML reports"""
import matplotlib.pyplot as plt
taxons = self.taxons.copy()
if len(self.taxons.index) == 0:
return None
df = self.get_taxonomy_db(list(self.taxons.index))
self.dd = df
if self.unclassified > 0:
df.loc[-1] = ["Unclassified"] * 8
taxons[-1] = self.unclassified
df["ratio"] = taxons / taxons.sum() * 100
data_class = df.groupby(["kingdom", "class"]).sum()
data_species = df.groupby(["kingdom", "species"]).sum()
X = []
Y = []
Z = []
labels = []
zlabels, ztaxons = [], []
kingdom_colors = []
inner_colors = []
inner_labels = []
species_colors = []
taxons = df["species"].reset_index().set_index("species")
for kingdom in data_class.index.levels[0]:
# kingdom info
X.append(data_class.loc[kingdom].ratio.sum())
# class info
y = list(data_class.loc[kingdom].ratio.values)
temp = data_class.loc[kingdom]
y1 = temp.query("ratio>=0.5")
y2 = temp.query("ratio<0.5")
y = list(y1.ratio.values) + list(y2.ratio.values)
inner_labels += list(y1.ratio.index) + [""] * len(y2.ratio)
Y.extend(y)
# species info
temp = data_species.loc[kingdom]
z1 = temp.query("ratio>=0.5")
z2 = temp.query("ratio<0.5")
z = list(z1.ratio.values) + list(z2.ratio.values)
zlabels += list(z1.ratio.index) + [""] * len(z2.ratio)
Z.extend(z)
if kingdom.strip():
labels.append(kingdom)
else:
labels.append("undefined/unknown taxon")
if kingdom == "Eukaryota":
this_cmap = plt.cm.Purples
elif kingdom == "Unclassified":
this_cmap = plt.cm.Greys
elif kingdom == "Bacteria":
this_cmap = plt.cm.Reds
elif kingdom == "Viruses":
this_cmap = plt.cm.Greens
elif kingdom == "Archaea":
this_cmap = Colormap().cmap_linear("yellow", "yellow", "orange")
else:
this_cmap = Colormap().cmap_linear("light gray", "gray(w3c)", "dark gray")
kingdom_colors.append(this_cmap(0.8))
inner_colors.extend(this_cmap(np.linspace(0.6, 0.2, len(y))))
species_colors.extend(this_cmap(np.linspace(0.6, 0.2, len(z))))
fig, ax = pylab.subplots(figsize=(9.5, 7))
size = 0.2
pct_distance = 0
w1, l1 = ax.pie(
X,
radius=1 - 2 * size,
colors=kingdom_colors,
wedgeprops=dict(width=size, edgecolor="w"),
labels=labels,
labeldistance=0.4,
)
w2, l2 = ax.pie(
Y,
radius=1 - size,
colors=inner_colors,
labels=[x.replace("Unclassified", "") for x in inner_labels],
wedgeprops=dict(width=size, edgecolor="w"),
labeldistance=0.65,
)
# labels can be long. Let us cut them
zlabels2 = []
for this in zlabels:
if len(this) > 30:
zlabels2.append(this[0:30] + "...")
else:
zlabels2.append(this)
w3, l3 = ax.pie(
Z,
radius=1,
colors=species_colors,
labels=[x.replace("Unclassified", "") for x in zlabels2],
wedgeprops=dict(width=size, edgecolor="w"),
labeldistance=0.9,
)
ax.set(aspect="equal")
pylab.subplots_adjust(right=1, left=0, bottom=0, top=1)
pylab.legend(labels, title="kingdom", loc="upper right", fontsize=fontsize)
import webbrowser
mapper = {k: v for k, v in zip(zlabels, Z)}
def on_pick(event):
wedge = event.artist
label = wedge.get_label()
if mapper[label] > 1:
taxon = taxons.loc[label, "index"]
webbrowser.open("https://www.ncbi.nlm.nih.gov/Taxonomy/Browser/wwwtax.cgi?id={}".format(taxon))
else:
wedge.set_color("white")
for wedge in w3:
wedge.set_picker(True)
fig.canvas.mpl_connect("pick_event", on_pick)
# this is used to check that everything was okay in the rules
return df
[docs] def plot(
self,
kind="pie",
cmap="tab20c",
threshold=1,
radius=0.9,
textcolor="red",
delete_krona_file=False,
**kargs,
):
"""A simple non-interactive plot of taxons
:return: None if no taxon were found and a dataframe otherwise
A Krona Javascript output is also available in :meth:`kraken_to_krona`
.. plot::
:include-source:
from sequana import KrakenResults, sequana_data
test_file = sequana_data("kraken.out", "doc")
k = KrakenResults(test_file)
df = k.plot(kind='pie')
.. seealso:: to generate the data see :class:`KrakenPipeline`
or the standalone application **sequana_taxonomy**.
.. todo:: For a future release, we could use this kind of plot
https://stackoverflow.com/questions/57720935/how-to-use-correct-cmap-colors-in-nested-pie-chart-in-matplotlib
"""
if len(self._df) == 0:
return
if self._data_created == False:
status = self.kraken_to_krona()
if kind not in ["barh", "pie"]:
logger.error("kind parameter: Only barh and pie are supported")
return
# This may have already been called but maybe not. This is not time
# consuming, so we call it again here
if len(self.taxons.index) == 0:
return None
df = self.get_taxonomy_db(list(self.taxons.index))
if self.unclassified > 0:
df.loc[-1] = ["Unclassified"] * 8
data = self.taxons.copy()
# we add the unclassified only if needed
if self.unclassified > 0:
data.loc[-1] = self.unclassified
data = data / data.sum() * 100
assert threshold > 0 and threshold < 100
# everything below the threshold (1) is gather together and summarised
# into 'others'
others = data[data < threshold].sum()
data = data[data >= threshold]
names = df.loc[data.index]["name"]
data.index = names.values
if others > 0:
data.loc["others"] = others
try:
data.sort_values(inplace=True)
except:
data.sort(inplace=True)
pylab.figure(figsize=(10, 8))
pylab.clf()
self.dd = data
if kind == "pie":
ax = data.plot(kind=kind, cmap=cmap, autopct="%1.1f%%", radius=radius, **kargs)
pylab.ylabel(" ")
for text in ax.texts:
# large, x-small, small, None, x-large, medium, xx-small,
# smaller, xx-large, larger
text.set_size("small")
text.set_color(textcolor)
for wedge in ax.patches:
wedge.set_linewidth(1)
wedge.set_edgecolor("k")
self.ax = ax
elif kind == "barh":
ax = data.plot(kind=kind, **kargs)
pylab.xlabel(" percentage ")
if delete_krona_file:
os.remove(self.filename + ".summary")
return data
[docs] def to_js(self, output="krona.html"):
if not shutil.which("ktImportText"): # pragma: no cover
logger.error(
"ktImportText executable not found. Please install it with conda or the method of your choice. We also provide ktImportText within damona.readthedocs.io "
)
sys.exit(1)
if self._data_created == False:
status = self.kraken_to_krona()
shell("ktImportText %s -o %s" % (self.output_filename, output))
[docs] def boxplot_classified_vs_read_length(self):
"""Show distribution of the read length grouped by classified or not"""
# if paired and kraken2, there are | in length to separate both reads.
# to simplify, if this is the case, we will just take the first read
# length for now.
df = self.df.copy()
try: # kraken2
df.length = df.length.apply(lambda x: int(x.split("|")[0]))
except:
pass
df[["status", "length"]].groupby("status").boxplot()
return df
[docs] def histo_classified_vs_read_length(self):
"""Show distribution of the read length grouped by classified or not"""
# if paired and kraken2, there are | in length to separate both reads.
# to simplify, if this is the case, we will just take the first read
# length for now.
df = self.df.copy()
if "|" in str(df.length.values[0]):
df.length = df.length.apply(lambda x: int(x.split("|")[0]))
df = df[["status", "length"]]
M = df["length"].max()
df.hist(by="status", sharey=True, bins=pylab.linspace(0, M, int(M / 5)))
axes = pylab.gcf().get_axes()
axes[0].set_xlabel("read length")
axes[1].set_xlabel("read length")
axes[1].grid(True)
axes[0].grid(True)
return df
[docs]class KrakenPipeline(object):
"""Used by the standalone application sequana_taxonomy
This runs Kraken on a set of FastQ files, transform the results
in a format compatible for Krona, and creates a Krona HTML report.
::
from sequana import KrakenPipeline
kt = KrakenPipeline(["R1.fastq.gz", "R2.fastq.gz"], database="krakendb")
kt.run()
kt.show()
.. warning:: We do not provide Kraken database within sequana. You may
either download a database from https://ccb.jhu.edu/software/kraken/
or use this class to download a toy example that will
be stored in e.g .config/sequana under Unix platforms.
See :class:`~sequana.kraken.downloads.KrakenDownload`.
.. seealso:: We provide a standalone application of this class, which is
called sequana_taxonomy and can be used within a command shell.
"""
def __init__(
self,
fastq,
database,
threads=4,
output_directory="kraken",
dbname=None,
confidence=0,
):
""".. rubric:: Constructor
:param fastq: either a fastq filename or a list of 2 fastq filenames
:param database: the path to a valid Kraken database
:param threads: number of threads to be used by Kraken
:param output_directory: output filename of the Krona HTML page
:param dbname:
Description: internally, once Kraken has performed an analysis, reads
are associated to a taxon (or not). We then find the correponding
lineage and scientific names to be stored within a Krona formatted file.
KtImportTex is then used to create the Krona page.
"""
# Set and create output directory
self.output_directory = output_directory
os.makedirs(output_directory, exist_ok=True)
self.database = database
self.ka = KrakenAnalysis(fastq, database, threads, confidence=confidence)
if dbname is None:
self.dbname = os.path.basename(database)
else:
self.dbname = dbname
[docs] def run(
self,
output_filename_classified=None,
output_filename_unclassified=None,
only_classified_output=False,
):
"""Run the analysis using Kraken and create the Krona output
.. todo:: reuse the KrakenResults code to simplify this method.
"""
if not shutil.which("ktImportText"): # pragma: no cover
logger.error(
"ktImportText executable not found. Please install it with conda or the method of your choice. We also provide ktImportText within damona.readthedocs.io "
)
sys.exit(1)
# Run Kraken (KrakenAnalysis)
kraken_results = self.output_directory / "kraken.out"
self.ka.run(
output_filename=kraken_results,
output_filename_unclassified=output_filename_unclassified,
output_filename_classified=output_filename_classified,
only_classified_output=only_classified_output,
)
# Translate kraken output to a format understood by Krona and save png
# image
self.kr = KrakenResults(kraken_results, verbose=False)
# we save the pie chart
try:
self.kr.plot2(kind="pie")
except Exception as err:
logger.warning(err)
self.kr.plot(kind="pie")
pylab.savefig(self.output_directory / "kraken.png")
# we save information about the unclassified reads (length)
try:
self.kr.boxplot_classified_vs_read_length()
pylab.savefig(self.output_directory / "boxplot_read_length.png")
except Exception as err:
logger.warning("boxplot read length could not be computed")
try:
self.kr.histo_classified_vs_read_length()
pylab.savefig(self.output_directory / "hist_read_length.png")
except Exception as err:
logger.warning("hist read length could not be computed")
prefix = self.output_directory
self.kr.kraken_to_json(prefix / "kraken.json", self.dbname)
self.kr.kraken_to_csv(prefix / "kraken.csv", self.dbname)
# Transform to Krona HTML
kraken_html = self.output_directory / "kraken.html"
status = self.kr.kraken_to_krona(output_filename=prefix / "kraken.out.summary")
if status is True:
shell("ktImportText %s -o %s" % (prefix / "kraken.out.summary", kraken_html))
else:
shell("touch {}".format(kraken_html))
# finally a summary
database = KrakenDB(self.database)
summary = {"database": [database.name]}
summary[database.name] = {"C": int(self.kr.classified)}
summary["U"] = int(self.kr.unclassified)
summary["total"] = int(self.kr.unclassified + self.kr.classified)
# redundant but useful and compatible with sequential approach
summary["unclassified"] = int(self.kr.unclassified)
summary["classified"] = int(self.kr.classified)
return summary
[docs] def show(self):
"""Opens the filename defined in the constructor"""
from easydev import onweb
onweb(self.output)
[docs]class KrakenAnalysis(object):
"""Run kraken on a set of FastQ files
In order to run a Kraken analysis, we firtst need a local database.
We provide a Toy example. The ToyDB is downloadable as follows ( you will
need to run the following code only once)::
from sequana import KrakenDownload
kd = KrakenDownload()
kd.download_kraken_toydb()
.. seealso:: :class:`KrakenDownload` for more databases
The path to the database is required to run the analysis. It has been
stored in the directory ./config/sequana/kraken_toydb under Linux platforms
The following code should be platform independent::
import os
from sequana import sequana_config_path
database = sequana_config_path + os.sep + "kraken_toydb")
Finally, we can run the analysis on the toy data set::
from sequana import sequana_data
data = sequana_data("Hm2_GTGAAA_L005_R1_001.fastq.gz", "data")
ka = KrakenAnalysis(data, database=database)
ka.run()
This creates a file named *kraken.out*. It can be interpreted with
:class:`KrakenResults`
"""
def __init__(self, fastq, database, threads=4, confidence=0):
""".. rubric:: Constructor
:param fastq: either a fastq filename or a list of 2 fastq filenames
:param database: the path to a valid Kraken database
:param threads: number of threads to be used by Kraken
:param confidence: parameter used by kraken2
:param return:
"""
self.database = KrakenDB(database)
self.threads = threads
self.confidence = confidence
# Fastq input
if isinstance(fastq, (str, PosixPath)):
self.paired = False
self.fastq = [fastq]
elif isinstance(fastq, list):
if len(fastq) == 2:
self.paired = True
elif len(fastq) == 1:
self.paired = False
else:
raise IOError(("You must provide 1 or 2 files"))
self.fastq = fastq
else:
print(type(fastq), len(fastq))
raise ValueError(f"Expected a fastq filename or list of 2 fastq filenames Got {fastq}")
[docs] def run(
self,
output_filename=None,
output_filename_classified=None,
output_filename_unclassified=None,
only_classified_output=False,
):
"""Performs the kraken analysis
:param str output_filename: if not provided, a temporary file is used
and stored in :attr:`kraken_output`.
:param str output_filename_classified: not compressed
:param str output_filename_unclassified: not compressed
"""
if not shutil.which("kraken2"): # pragma: no cover
logger.error(
"kraken2 executable not found. Please install it with conda or the method of your choice. We also provide kraken2 within damona.readthedocs.io "
)
sys.exit(1)
if self.database.version != "kraken2":
logger.error(f"input database is not valid kraken2 database")
sys.exit(1)
if output_filename is None:
self.kraken_output = TempFile().name
else:
self.kraken_output = output_filename
dirname = os.path.dirname(output_filename)
if os.path.exists(dirname) is False:
os.makedirs(dirname)
# make sure the required output directories exist:
# and that the output filenames ends in .fastq
if output_filename_classified:
assert output_filename_classified.name.endswith(".fastq")
dirname = os.path.dirname(output_filename_classified)
if os.path.exists(dirname) is False:
os.makedirs(dirname)
if output_filename_unclassified:
assert output_filename_unclassified.name.endswith(".fastq")
dirname = os.path.dirname(output_filename_unclassified)
if os.path.exists(dirname) is False:
os.makedirs(dirname)
params = {
"database": self.database.path,
"thread": self.threads,
"file1": self.fastq[0],
"kraken_output": self.kraken_output,
"output_filename_unclassified": output_filename_unclassified,
"output_filename_classified": output_filename_classified,
}
if self.paired:
params["file2"] = self.fastq[1]
command = f"kraken2 --confidence {self.confidence}"
command += f" {params['file1']}"
if self.paired:
command += f" {params['file2']} --paired"
command += f" --db {params['database']} "
command += f" --threads {params['thread']} "
command += f" --output {params['kraken_output']} "
# If N is number of reads unclassified 3 cases depending on out-fmt
# choice
# case1 --paired and out-fmt legacy saved fasta R1 and R2 together on N lines
# case2 --paired and out-fmt interleaved saved fasta R1 and R2 alternatively on 2N lines
# case3 --paired and out-fmt paired saved R1 on N lines. Where is R2 ????
# Note, that there is always one single file. So, the only way for
# kraken to know that this new files (used as input) is paired, is to
# use --paired.
# In any case, this new file looks like an R1-only file. Indeed, if
# interleaved, all data inside the file, if legacy, The R1 and R2 are
# separated by N but a unique sequence. If --out-fmt is paired, this is
# annoying. Indeed, half of the data is lost.
# So, if now input is
# case1, we cannot provide --paired
# case2 we cannot either, so how are R1 and R2 taken care of ?
# besides, if provided, the interleaved input is seen as single ended.
# Indeed, if provided, --out-fmt cannot be interleaved since krakne1
# complains that input is not paired.
# case3, only R1 so we cannot use --paired
# if kraken2, there is no --out-fmt option, so output is always a fastq
# with either R1 only or two output files.
# If we omit the --paired options, the 2 input R1 and R2 are considered
# as 2 different unrelated samples
# if we use --paired we now must have # in the file name, and then
# the two files are created
if self.database.version == "kraken2":
if output_filename_unclassified:
command += " --unclassified-out %(output_filename_unclassified)s "
if output_filename_classified:
command += " --classified-out %(output_filename_classified)s "
command = command % params
logger.debug(command)
shell(command)
if only_classified_output:
# kraken2 has no classified_output option. we mimic it here below
# just to get a temporary filename
fout = TempFile()
outname = fout.name
newfile = open(outname, "w")
# if the FastQ file is empty, there is no output file
if os.path.exists(output_filename):
with open(output_filename, "r") as fin:
for line in fin.readlines():
if line.startswith("C"):
newfile.write(line)
newfile.close()
shutil.move(outname, output_filename)
# a simple utility function
try:
from itertools import izip_longest
except:
from itertools import zip_longest as izip_longest
def grouper(iterable):
args = [iter(iterable)] * 8
return izip_longest(*args)
[docs]class KrakenSequential(object):
"""Kraken Sequential Analysis
This runs Kraken on a FastQ file with multiple k-mer databases in a
sequencial way way. Unclassified sequences with the first database are input
for the second, and so on.
The input may be a single FastQ file or paired, gzipped or not. FastA are
also accepted.
"""
def __init__(
self,
filename_fastq,
fof_databases,
threads=1,
output_directory="./kraken_sequential/",
keep_temp_files=False,
output_filename_unclassified=None,
output_filename_classified=None,
force=False,
confidence=0,
):
""".. rubric:: **constructor**
:param filename_fastq: FastQ file to analyse
:param fof_databases: file that contains a list of databases paths
(one per line). The order is important. Note that you may also
provide a list of datab ase paths.
:param threads: number of threads to be used by Kraken
:param output_directory: name of the output directory
:param keep_temp_files: bool, if True, will keep intermediate files
from each Kraken analysis, and save html report at each step
:param bool force: if the output directory already exists, the
instanciation fails so that the existing data is not overrwritten.
If you wish to overwrite the existing directory, set this
parameter to iTrue.
"""
self.filename_fastq = filename_fastq
self.confidence = confidence
# input databases may be stored in a file
if isinstance(fof_databases, str) and os.path.exists(fof_databases):
with open(fof_databases, "r") as fof:
self.databases = [absolute_path.split("\n")[0] for absolute_path in fof.readlines()]
# or simply provided as a list
elif isinstance(fof_databases, (list, tuple)):
self.databases = fof_databases[:]
else:
raise TypeError(
"input databases must be a list of valid kraken2 " "databases or a file (see documentation)"
)
self.databases = [KrakenDB(x) for x in self.databases]
for d in self.databases:
if d.version != "kraken2":
logger.error(f"input database {d} is not valid kraken2 ")
sys.exit(1)
self.threads = threads
self.output_directory = output_directory
self.keep_temp_files = keep_temp_files
# check if the output directory already exist
try:
os.mkdir(output_directory)
except OSError:
if os.path.isdir(output_directory) and force is False:
logger.error("Output directory %s already exists" % output_directory)
raise Exception
elif force is True:
logger.warning(
"Output directory %s already exists. You may " "overwrite existing results" % output_directory
)
# list of input fastq files
if isinstance(filename_fastq, list) and len(filename_fastq) in [1, 2]:
self.inputs = filename_fastq[:]
elif isinstance(filename_fastq, str):
self.inputs = [filename_fastq]
else:
msg = "input file must be a string or list of 2 filenames"
msg += "\nYou provided {}".format(filename_fastq)
raise TypeError(msg)
if len(self.inputs) == 1:
self.paired = False
elif len(self.inputs) == 2:
self.paired = True
self.unclassified_output = output_filename_unclassified
self.classified_output = output_filename_classified
def _run_one_analysis(self, iteration):
"""Run one analysis"""
db = self.databases[iteration]
logger.info("Analysing data using database {}".format(db))
# a convenient alias
_pathto = lambda x: self.output_directory / x
# the output is saved in this file
if self.paired:
# if paired, kraken2 expect a # and then will create 2 files (1 and 2
# )
# Note that kraken adds a _ before the # (1,2) so no need to add one
output_filename_unclassified = _pathto("unclassified_%d#.fastq" % iteration)
file_fastq_unclass = [
_pathto("unclassified_%d_1.fastq" % iteration),
_pathto("unclassified_%d_2.fastq" % iteration),
]
else:
output_filename_unclassified = _pathto("unclassified_%d.fastq" % iteration)
file_fastq_unclass = _pathto("unclassified_%d.fastq" % iteration)
if iteration == 0:
inputs = self.inputs
else:
inputs = self._list_kraken_input[iteration - 1]
# if this is the last iteration (even if iteration is zero), save
# classified and unclassified in the final kraken results.
if iteration == len(self.databases) - 1:
only_classified_output = False
else:
only_classified_output = True
file_kraken_out = self.output_directory / "kraken_{}.out".format(iteration)
# The analysis itself
analysis = KrakenAnalysis(inputs, db, self.threads, confidence=self.confidence)
analysis.run(
output_filename=file_kraken_out,
output_filename_unclassified=output_filename_unclassified,
only_classified_output=only_classified_output,
)
# save input/output files.
self._list_kraken_input.append(file_fastq_unclass)
self._list_kraken_output.append(file_kraken_out)
[docs] def run(self, dbname="multiple", output_prefix="kraken_final"):
"""Run the sequential analysis
:param dbname:
:param output_prefix:
:return: dictionary summarizing the databases names and
classified/unclassied
This method does not return anything creates a set of files:
- kraken_final.out
- krona_final.html
- kraken.png (pie plot of the classified/unclassified reads)
.. note:: the databases are run in the order provided in the constructor.
"""
# list of all output to merge at the end
self._list_kraken_output = []
self._list_kraken_input = []
# Iteration over the databases
for iteration in range(len(self.databases)):
# The analysis itself
status = self._run_one_analysis(iteration)
last_unclassified = self._list_kraken_input[-1]
# If everything was classified, we can stop here
try: # handle special case of emmpty FastQ file
try:
stat = os.stat(last_unclassified)
if stat.st_size == 0:
break
except TypeError:
stat = os.stat(last_unclassified[0])
if stat.st_size == 0:
break
except FileNotFoundError:
break
# concatenate all kraken output files
file_output_final = self.output_directory / f"{output_prefix}.out"
with open(file_output_final, "w") as outfile:
for fname in self._list_kraken_output:
with open(fname) as infile:
for line in infile:
outfile.write(line)
logger.info("Analysing final results")
result = KrakenResults(file_output_final, verbose=False)
try:
result.histo_classified_vs_read_length()
pylab.savefig(self.output_directory / "hist_read_length.png")
except Exception as err:
logger.warning("hist read length could not be computed")
try:
result.boxplot_classified_vs_read_length()
pylab.savefig(self.output_directory / "boxplot_read_length.png")
except Exception as err:
logger.warning("hist read length could not be computed")
# TODO: this looks similar to the code in KrakenPipeline. could be factorised
result.to_js("%s.html" % (self.output_directory / output_prefix))
try:
result.plot2(kind="pie")
except Exception as err:
logger.warning(err)
result.plot(kind="pie")
pylab.savefig(self.output_directory / "kraken.png")
prefix = self.output_directory
result.kraken_to_json(prefix / "kraken.json", dbname)
result.kraken_to_csv(prefix / "kraken.csv", dbname)
# remove kraken intermediate files (including unclassified files)
if self.unclassified_output:
# Just cp the last unclassified file
try:
# single-end data (one file)
shutil.copy2(self._list_kraken_input[-1], self.unclassified_output)
except:
for i, x in enumerate(self._list_kraken_input[-1]):
shutil.copy2(x, self.unclassified_output.replace("#", str(i + 1)))
if self.classified_output:
# Just cp the last classified file
shutil.copy2(self._list_kraken_input[-1], self.classified_output)
summary = {"databases": [x.name for x in self.databases]}
total = 0
classified = 0
for f_temp, db in zip(self._list_kraken_output, self.databases):
# In theory, the first N-1 DB returns only classified (C) read
# and the last one contains both
try:
df = pd.read_csv(f_temp, sep="\t", header=None, usecols=[0])
C = sum(df[0] == "C")
U = sum(df[0] == "U")
except pd.errors.EmptyDataError:
# if no read classified,
C = 0
U = 0
total += U
total += C
classified += C
summary[db.name] = {"C": C}
if U != 0: # the last one
summary["unclassified"] = U
summary["total"] = total
summary["classified"] = classified
if not self.keep_temp_files:
# kraken_0.out
for f_temp in self._list_kraken_output:
os.remove(f_temp)
# unclassified
for f_temp in self._list_kraken_input:
if isinstance(f_temp, str):
try:
os.remove(f_temp)
except FileNotFoundError:
pass
elif isinstance(f_temp, list):
for this in f_temp:
try:
os.remove(this)
except FileNotFoundError:
pass
return summary