#
# This file is part of Sequana software
#
# Copyright (c) 2016-2022 - Sequana Development Team
#
#
# Distributed under the terms of the 3-clause BSD license.
# The full license is in the LICENSE file, distributed with this software.
#
# website: https://github.com/sequana/sequana
# documentation: http://sequana.readthedocs.io
#
##############################################################################
"""Utilities to access to online FASTA, taxon, lineage ..."""
import ftplib
import glob
import math
import os
import colorlog
from bioservices import ENA, EUtils
from easydev import AttrDict, Progress, execute
from sequana.lazy import pandas as pd
logger = colorlog.getLogger(__name__)
[docs]class ENADownload(object):
"""Downloader to retrieve genome fasta files from ENA amongst other things
In order to facilitate the download of FASTA files (e.g. to build a Kraken
DB), this class can be used to download a bunch of FASTA files, or just one
given its accession.
Some **OLD** pre-defined lists are available from ENA. We refer to them as *virus*,
*plasmid*, *phage*, *archaealvirus*, *archaea*, *bacteria*, *organelle*,
*viroid*.
.. warning:: the header of the FASTA files are changed to add the GI number
instead of embl so th&at it can be used by our kraken builder class.
"""
def __init__(self):
""".. rubric:: constructor"""
self.eutils = EUtilsTools()
self.ena = ENA(cache=True)
[docs] def ena_id_to_gi_number(self, identifiers):
# Now, let us convert the ENA accession to NCBI GI number once for all.
# We can fetch only at max 200 identifiers:
logger.info("Fetching %s identifiers from NCBI" % len(identifiers))
Nbaskets = int(math.ceil(len(identifiers) / 200.0))
results = {}
from easydev import split_into_chunks
for chunk in split_into_chunks(identifiers, Nbaskets):
result = self.eutils.accession_to_info(",".join(chunk))
results.update(result)
return results
[docs] def download_fasta(self, filelist, output_dir=None):
"""Download a FASTA (or list of)
:param filelist: a name to find on the ENA web server OR the
name of an accession number or a file with accession numbers (1 column)
.. warning:: The filename is named after the accession without .X number
If there are several variant .1, .2 the later will be used. This
should not happen if the list is properly defined.
"""
try:
# input may be a file
with open(filelist, "r") as fin:
self._identifiers = [x.strip() for x in fin.readlines()]
except (FileNotFoundError, TypeError):
try:
# or a list of accession numbers
self._identifiers = filelist + []
except TypeError:
# a just one accesion number
self._identifiers = [filelist]
self.results = self.ena_id_to_gi_number(self._identifiers)
# do not use caching things this could be huge data sets.
if output_dir is None: # pragma: no cover
output_dir = "."
else:
try:
os.mkdir(output_dir)
except FileExistsError:
pass
N = len(self._identifiers)
pb = Progress(N)
logger.info("Fetching all fasta from ENA")
for i, identifier in enumerate(self._identifiers):
# download data from ENA
data = self.ena.get_data(identifier, "fasta")
# Split header and Fasta
try:
header, others = data.split("\n", 1)
except AttributeError:
logger.warning(f"{identifier} not found on ENA website")
continue
# Source of failure:
# - list and DB are not synchrone: e.g. some entries may be deleted
if "suppressed" in header: # pragma: no cover
continue
# Do not use try/except since when it fails, this is a real issue
name = header.strip(">").split(" ")[0]
db, id_, acc = name.split("|")
header = self.add_gi_to_header(acc)
# Save to local file
# WARNINGS: extension is .fa because kraken-build expects .fa files
filename = "%s_%s.fa" % (db, acc.split(".")[0])
if output_dir:
filename = output_dir + os.sep + filename
with open(filename, "w") as fout:
fout.write(header + "\n" + others)
pb.animate(i + 1)
[docs]class NCBIDownload:
def __init__(self):
self.category = {
"archaea",
"bacteria",
"fungi",
"invertebrate",
"mitochondrion",
"other",
"plant",
"plasmid",
"plastid",
"protozoa",
"vertebrate_mammalian",
"vertebrate_other",
"viral",
}
self.category_genomes = {
"archaea",
"bacteria",
"fungi",
"invertebrate",
"plant",
"protozoa",
"vertebrate_mammalian",
"vertebrate_other",
"viral",
}
[docs] def download_ncbi_refseq_release(self, category, email="sequana@pasteur.fr", outdir="."):
"""Download all files of type *fna* from ncbi FTP.
::
kb = NCBIDownload()
kb.download_ncbi_refseq_release("viral")
"""
assert category in self.category, "Please use one of {}".format(self.category)
ftp = ftplib.FTP("ftp.ncbi.nlm.nih.gov")
ftp.login("anonymous", email)
ftp.cwd(f"refseq/release/{category}")
filenames = ftp.nlst()
for filename in filenames:
if "genomic.fna" in filename:
logger.info(f"Downloading {filename}")
ftp.retrbinary(f"RETR {filename}", open("/".join([filename, outdir]), "wb").write)
return [x for x in filenames if "genomic.fna" in x]
[docs] def download_genomes_from_ncbi(self, category, email="sequana@pasteur.fr"): # pragma: no cover
"""This downloads all genomes on ncbi for a given category looking at
their ftp. This could be highly redundant.
"""
assert category in self.category
ftp = ftplib.FTP("ftp.ncbi.nlm.nih.gov")
ftp.login("anonymous", email)
ftp.cwd(f"refseq/release/{category}")
for filename in ftp.nlst():
if "genomic.fna" in filename:
logger.info(f"Downloading {filename}")
ftp.retrbinary(f"RETR {filename}", open(filename, "wb").write)
[docs] def download_assembly_report(self, category, output=None):
assert category in self.category_genomes
ftp = ftplib.FTP("ftp.ncbi.nlm.nih.gov")
ftp.login("anonymous", "anonymous")
ftp.cwd(f"genomes/refseq/{category}")
filename = output if output else f"assembly_summary_{category}.txt"
logger.info(f"Downloading {filename}")
ftp.retrbinary(f"RETR assembly_summary.txt", open(filename, "wb").write)
[docs]class NCBITaxonReader:
"""This class will help in reading, handling, simplifying NCBI taxonomic DB
When downloading NCBI taxonomy DB using e.g. Kraken, we end up with very
large files. One is called names.dmp and the other nodes.dmp. They may be
instrospected or simplified using this class
The names.dmp is just a CSV file. The header looks like::
1 | all | | synonym |
1 | root | | scientific name |
2 | Bacteria | Bacteria <prokaryote> | scientific name |
2 | Monera | Monera <Bacteria> | in-part |
2 | Procaryotae | Procaryotae <Bacteria> | in-part |
It is a tabulated file. If we ignore the | signs, it contains 4 columns::
taxid
name
unique name
type of name
The *unique name* column is generally empty and is dropped internally.
There are different types of *name*, so there can be several rows for
a given *taxid*. For instance
for the taxon 1, there isa *scientific name* and a **synonym**
The :attr:`df_name` is a dataframe that stores the taxid, name and type of
name in a dataframe.
The second file 'nodes.dmp') looks like::
1 | 1 | no rank | | 8 | 0 | 1 | 0 | 0 | 0 | 0 | 0 | |
2 | 131567 | superkingdom | | 0 | 0 | 11 | 0 | 0 | 0 | 0 | 0 | |
6 | 335928 | genus | | 0 | 1 | 11 | 1 | 0 | 1 | 0 | 0 | |
7 | 6 | species | AC | 0 | 1 | 11 | 1 | 0 | 1 | 1 | 0 | |
9 | 32199 | species | BA | 0 | 1 | 11 | 1 | 0 | 1 | 1 | 0 | |
Again this is a tabulated file. The first three columns are taxid, parent taxid,
and rank. Rank is species, genus, family, phylum, etc. Newest version of
nodes.dmp hqs only 4 columns (taxid, parent taxid, rank ,a dash)
::
from sequana.databases import NCBITaxonReader
# The first time you may want to download the taxdump files
n = NCBITaxonReader()
n.download_taxdump()
n.init("names.dmp", "nodes.dmp")
# next time, you can read it directly
n.NCBITaxonReader("names.dmp", "nodes.dmp")
"""
ftp_url = "ftp.ncbi.nih.gov"
def __init__(self, names=None, nodes=None):
""".. rubric:: Constructor
:param str names: Defaults to "names.dmp".
:param str nodes: Defaults to "nodes.dmp".
"""
if (names and os.path.exists(names)) and (nodes and os.path.exists(nodes)):
self.init(names, nodes)
else:
logger.warning("no input file provided or do not exist. Call download_taxdump() and init() manually")
[docs] def init(self, names, nodes):
logger.info(f"Reading {names}")
self.filename_names = names
self.filename_nodes = nodes
self.df_names = pd.read_csv(self.filename_names, sep="\t", header=None)
# get rid of the pipes using ::2
self.df_names = self.df_names.loc[:, ::2]
self.df_names.rename({0: "taxon", 2: "name", 4: "nothing", 6: "scname"}, axis=1, inplace=True)
# This will provide a faster lookup table to search for scientific
# names given a taxon. We can drop rows that are not scientific names
# and set the taxons as index
_subdf = self.df_names.query("'scientific name' in scname")
self._subdf = _subdf.set_index("taxon")
# Here, this is for general purpose (slower it we were to use
# this for the get_scientic_name method
self._group_name = self.df_names.groupby("taxon").groups
logger.info(f"Reading {nodes}")
self.df_nodes = pd.read_csv(self.filename_nodes, sep="\t", header=None)
# get rid of the pipes using ::2
self.df_nodes = self.df_nodes.loc[:, ::2]
new_cols = ["taxon", "parent_taxon", "rank"] + list(range(0, len(self.df_nodes.columns) - 3))
self.df_nodes.columns = new_cols
self._df_nodes_taxon = self.df_nodes.copy()
self._df_nodes_taxon.set_index("taxon", inplace=True)
[docs] def download_taxdump(self, outpath="."): # pragma: no cover
execute(f"wget {self.ftp_url}/pub/taxonomy/taxdump.tar.gz --directory-prefix {outpath}")
execute(f"tar xvfz {outpath}/taxdump.tar.gz -C {outpath}")
[docs] def get_number_taxon(self):
"""Return number of unique taxon"""
return len(self.df_names["taxon"].unique())
[docs] def get_average_name_per_taxon(self):
"""Return number of rows/names per node/taxon"""
from sequana.lazy import numpy as np
return np.mean([len(values) for values in self._group_name.values()])
[docs] def get_scientific_name(self, taxon):
"""Return scientific name of a given Taxon"""
# Takes 2 minutes to scan all taxons
return self._subdf.loc[taxon].values[0]
[docs] def get_taxon_from_scientific_name(self, scname):
"""Return taxon corresponding to a scientific name
return: unique taxon or first one found. If none found, returns None
"""
res = self.df_names.query("@scname in name")["taxon"]
return res
[docs] def search(self, name):
"""Search names colum"""
return self.df_names[self.df_names["name"].apply(lambda x: name in x)]
[docs] def get_family(self, taxon):
"""Get all parent taxons"""
taxons = [1]
df = self._df_nodes_taxon
final_links = (0, 1)
while True:
res = df.loc[taxon]
taxons.append(taxon)
taxon = res["parent_taxon"]
# hopefully there is always a family link to 0 or 1
if len(res) == 0 or taxon in final_links:
break
return taxons
[docs] def filter_nodes_dmp_file(self, output="nodes_filtered.dmp", taxons=[]):
"""Save a subset of nodes.dmp given list of valid taxons
:param str: Defaults to "nodes_filtered.dmp".
:param list taxons:
"""
all_taxons = set()
for i, taxon in enumerate(taxons):
parents = self.get_family(taxon)
all_taxons.update(parents)
N = len(all_taxons)
print(f"Saving {N} taxons")
print(all_taxons)
# In theory one line per taxon
with open(self.filename_nodes, "r") as fin:
with open(output, "w") as fout:
for line in fin.readlines():
if int(line.split("\t", 1)[0]) in all_taxons:
fout.write(line)
[docs] def filter_names_dmp_file(self, output="names_filtered.dmp", taxons=[]):
"""Save a subset of nodes.dmp given list of valid taxons
:param str: Defaults to "nodes_filtered.dmp".
:param list taxons:
"""
# here we want to save the family tree
all_taxons = set()
for i, taxon in enumerate(taxons):
parents = self.get_family(taxon)
all_taxons.update(parents)
N = len(all_taxons)
print(f"Saving {N} taxons")
print(all_taxons)
# we may have more entries than expecte because a same taxon is usesd for difference species
with open(self.filename_names, "r") as fin:
with open(output, "w") as fout:
for line in fin.readlines():
if int(line.split("\t", 1)[0]) in all_taxons:
fout.write(line)