A RetroSearch Logo

Home - News ( United States | United Kingdom | Italy | Germany ) - Football scores

Search Query:

Showing content from https://physcraper.readthedocs.io/en/main/_modules/physcraper/ncbi_data_parser.html below:

physcraper.ncbi_data_parser — Physcraper 0.1 documentation

"""uses ncbi databases to easily retrieve taxonomic information.

parts are altered from https://github.com/zyxue/ncbitax2lin/blob/master/ncbitax2lin.py
"""

import os
import sys
import pandas as pd
from physcraper.helpers import debug



_DEBUG = 0


[docs]def get_acc_from_blast(query_string):
    """
    Get the accession number from a blast query.
    :param query_string: string that contains acc and gi from local blast query result
    :return: gb_acc

    """
    if len(query_string.split("|")) >= 3:
        gb_acc = query_string.split("|")[3]
    else:
        gb_acc = query_string.split("|")[0]
    if len(gb_acc.split(".")) < 2:
        sys.stderr.write("query string {} does not contain a Genbank accession number. Skipping\n".format(query_string))
        return None
    assert len(gb_acc.split(".")) >= 2, (len(gb_acc.split(".")), gb_acc)
    return gb_acc

[docs]def get_gi_from_blast(query_string):
    """
    Get the gi number from a blast query.
    Get acc is more difficult now, as new seqs not always have gi number, then query changes.

    If not available return None.

    :param query_string: string that contains acc and gi from local blast query result
    :return: gb_id if available
    """
    if len(query_string.split("|")) >= 3:
        gb_gi = query_string.split("|")[1]
        assert len(gb_gi.split(".")) < 2, (len(gb_gi.split(".")), gb_gi)
        assert gb_gi.isdigit() is True
        return gb_gi
    else:
        return None

[docs]def get_tax_info_from_acc(gb_id, ids_obj):
    '''takes an accession number and returns the ncbi_id and the taxon name'''
#    debug("Getting tax info from acc {}".format(gb_id))
    ncbi_id = ids_obj.get_ncbiid_from_acc(gb_id)
    tax_name = ids_obj.ncbiid_to_spn.get(ncbi_id)
    if ncbi_id is None:
        sys.stderr.write("Failed to get information for sequence with accession number {}\n".format(gb_id))
    return ncbi_id, tax_name


[docs]def get_ncbi_tax_id(handle):
    """Get the taxon ID from ncbi. ONly used for web queries

    :param handle: NCBI read.handle
    :return: ncbi_id
    """
    ncbi_id = None
    gb_list = handle[0]["GBSeq_feature-table"][0]["GBFeature_quals"]
    for item in gb_list:
        if item[u"GBQualifier_name"] == "db_xref":
            if item[u"GBQualifier_value"][:5] == "taxon":
                ncbi_id = int(item[u"GBQualifier_value"][6:])
                break
            else:
                continue
    return ncbi_id


[docs]def get_ncbi_tax_name(handle):
    """Get the sp name from ncbi.
    Could be replaced by direct lookup to ott_ncbi.

    :param handle: NCBI read.handle
    :return: ncbi_spn
    """
    ncbi_sp = None
    gb_list = handle[0]["GBSeq_feature-table"][0]["GBFeature_quals"]
    for item in gb_list:
        if item[u"GBQualifier_name"] == "organism":
            ncbi_sp = str(item[u"GBQualifier_value"])
            ncbi_sp = ncbi_sp.replace(" ", "_")
    return ncbi_sp


[docs]def strip(inputstr):
    """ Strips of blank characters from string in pd dataframe.
    """
    if isinstance(inputstr, str):
        return inputstr.strip()
    else:
        return inputstr


[docs]def load_nodes(nodes_file):
    """ Loads nodes.dmp and converts it into a pandas.DataFrame.
    Contains the information about the taxonomic hierarchy of names.
    """
    # print(nodes_file)
    assert os.path.exists(nodes_file), (
        "file `%s` does not exist. Make sure you downloaded the "
        "databases from ncbi." % nodes_file
    )
    df = pd.read_csv(
        nodes_file,
        sep="|",
        header=None,
        index_col=False,
        names=[
            "tax_id",
            "parent_tax_id",
            "rank",
            "embl_code",
            "division_id",
            "inherited_div_flag",
            "genetic_code_id",
            "inherited_GC__flag",
            "mitochondrial_genetic_code_id",
            "inherited_MGC_flag",
            "GenBank_hidden_flag",
            "hidden_subtree_root_flag",
            "comments",
        ],
    )
    # To get rid of flanking tab characters
    df["rank"] = df["rank"].apply(strip)
    df["embl_code"] = df["embl_code"].apply(strip)
    df["comments"] = df["comments"].apply(strip)
    return df


[docs]def load_names(names_file):
    """ Loads names.dmp and converts it into a pandas.DataFrame.
    Includes only names which are accepted as scientific name by ncbi.
    """
    assert os.path.exists(names_file), (
        "file `%s` does not exist. Make sure you downloaded the "
        "databases from ncbi." % names_file
    )
    df = pd.read_csv(
        names_file,
        sep="|",
        header=None,
        index_col=False,
        names=["tax_id", "name_txt", "unique_name", "name_class"],
    )
    df["name_txt"] = df["name_txt"].apply(strip)
    df["unique_name"] = df["unique_name"].apply(strip)
    df["name_class"] = df["name_class"].apply(strip)
    sci_df = df[df["name_class"] == "scientific name"]
    sci_df.reset_index(drop=True, inplace=True)
    return sci_df


[docs]def load_synonyms(names_file):
    """Loads names.dmp and converts it into a pandas.DataFrame.
        Includes only names which are viewed as synonym by ncbi.
    """
    assert os.path.exists(names_file), (
        "file `%s` does not exist. Make sure you downloaded "
        "the databases from ncbi." % names_file
    )
    # print("load synonyms")
    df = pd.read_csv(
        names_file,
        sep="|",
        header=None,
        index_col=False,
        names=["tax_id", "name_txt", "unique_name", "name_class"],
    )
    df["name_txt"] = df["name_txt"].apply(strip)
    df["unique_name"] = df["unique_name"].apply(strip)
    df["name_class"] = df["name_class"].apply(strip)
    sci_df = df[df["name_class"] == "synonym"]
    sci_df.reset_index(drop=True, inplace=True)
    return sci_df


[docs]class Parser:
    """Reads in databases from ncbi to connect species names with the taxonomic identifier
    and the corresponding hierarchical information. It provides a much faster way to get those information then using
    web queries. We use those files to get independent from web requests to find those information (the implementation
    of it in BioPython was not really reliable).
    Nodes includes the hierarchical information, names the scientific names and ID's.
    The files need to be updated regularly, best way to always do it when a new blast database was loaded.
    """

    def __init__(self, names_file, nodes_file):
        self.names_file = names_file
        self.nodes_file = nodes_file
        # self.initialize()
        self.nodes = load_nodes(self.nodes_file)
        self.names = load_names(self.names_file)
        self.synonyms = load_synonyms(self.names_file)

[docs]    def get_rank(self, tax_id):
        """ Get rank for given ncbi tax id.
        """
        if tax_id is None:
            rank = "unassigned"
        else:
            nodes = self.nodes
            rank = nodes[nodes["tax_id"] == tax_id]["rank"].values[0]
        return rank

[docs]    def get_downtorank_id(self, tax_id, downtorank="species"):
        """ Recursive function to find the parent id of a taxon as defined by downtorank.
        """
        nodes = self.nodes
        if not isinstance(tax_id, int):
#        debug("get downtorank")
           # sys.stdout.write(
           #     "WARNING: tax_id {} is no integer. Will convert value to int\n".format(
           #         tax_id
           #     )
           # )
            tax_id = int(tax_id)
        # debug(downtorank)
        # following statement is to get id of taxa if taxa is higher ranked than specified
        if nodes[nodes["tax_id"] == tax_id]["rank"].values[0] != "species":
            if downtorank == "species":
                if (nodes[nodes["tax_id"] == tax_id]["rank"].values[0] != "varietas"
                        and nodes[nodes["tax_id"] == tax_id]["rank"].values[0]
                        != "subspecies"):
                    return tax_id
        if nodes[nodes["tax_id"] == tax_id]["rank"].values[0] == downtorank:
            # debug("found right rank")
            return tax_id
        elif nodes[nodes["tax_id"] == tax_id]["rank"].values[0] == "superkingdom":
            tax_id = 0
            return tax_id
        else:
            parent_id = int(nodes[nodes["tax_id"] == tax_id]["parent_tax_id"].values[0])
            return self.get_downtorank_id(parent_id, downtorank)

[docs]    def match_id_to_mrca(self, tax_id, mrca_id):
        """ Recursive function to find out if tax_id is part of mrca_id.
        """
        # debug("match_id_to_mrca")
        nodes = self.nodes
       # debug("testing if {} within {}".format(tax_id, mrca_id))
        current_id = int(tax_id)
        mrca_id = int(mrca_id)
        #debug([rank_mrca_id, rank_tax_id])
        while current_id:
            if current_id == mrca_id:
                # debug("found right rank")
                return True
            elif current_id == 1:
   #             debug("current id is: {}".format(current_id))
                return False
            elif current_id == 0:
                debug("current id is: {}, in search for {} in {}".format(current_id, tax_id, mrca_id))
                return False
            else: #try parent
                try:
                    current_id = int(nodes[nodes["tax_id"] == current_id]["parent_tax_id"].values[0])
                    # to get the except:
                    # current_id = 17043521
                    # nodes = ncbitax.nodes  ## ncbitax object from test_ncbi_parser.py
                except IndexError:
                    sys.stderr.write("no parent found for ncbi:id {}".format(current_id))
                    return False
#                debug("parent id is: {}".format(current_id))

[docs]    def get_name_from_id(self, tax_id):
        """ Find the scientific name for a given ID.
        """
        try:
            names = self.names
            if tax_id == 0:
                tax_name = "unidentified"
            else:
                tax_name = names[names["tax_id"] == tax_id]["name_txt"]
                tax_name = tax_name.values[0].replace(" ", "_")
                tax_name = tax_name.strip()
        except IndexError:
            sys.stdout.write("tax_id {} unknown by ncbi_parser files (names.dmp)\n".format(tax_id))
            tax_name = "unknown_{}".format(tax_id)
            if os.path.exists("ncbi_id_unknown.err"):
                fn = open("ncbi_id_unknown.err", "a")
                fn.write("{}".format(tax_id))
                fn.close()
            else:
                fn = open("ncbi_id_unknown.err", "w")
                fn.write("{}".format(tax_id))
                fn.close()
        return tax_name

[docs]    def get_id_from_name(self, tax_name):
        """ Find the ID for a given taxonomic name.
        """
        names = self.names
        org_tax = tax_name
        tax_name = tax_name.replace("_", " ")
        if len(tax_name.split(" ")) >= 2:
            if tax_name.split(" ")[1] == "sp.":
                tax_name = "{}".format(tax_name.split(" ")[0])
        try:
            tax_id = names[names["name_txt"] == tax_name]["tax_id"].values[0]
        except IndexError:
            if len(tax_name.split(" ")) == 3:
                tax_name = "{} {}-{}".format(
                    tax_name.split(" ")[0],
                    tax_name.split(" ")[1],
                    tax_name.split(" ")[2],
                )
                tax_id = names[names["name_txt"] == tax_name]["tax_id"].values[0]
                sys.stdout.write(
                    "tax_name {} unknown, modified to {} worked.\n".format(org_tax, tax_name)
                )
            else:
                sys.stdout.write(
                    "Are you sure, its an accepted name and not a synonym: {}? "
                    "I look in the synonym table now.\n".format(tax_name)
                )
                tax_id = self.get_id_from_synonym(tax_name)
        tax_id = int(tax_id)
        return tax_id

[docs]    def get_id_from_synonym(self, tax_name):
        """ Find the ID for a given taxonomic name, which is not an accepted name.
        """
        names = self.names
        synonyms = self.synonyms
        tax_name = tax_name.replace("_", " ")
        try:
            tax_id = synonyms[synonyms["name_txt"] == tax_name]["tax_id"].values[0]
        except IndexError:
            if len(tax_name.split(" ")) == 3:
                tax_name = "{} {}-{}".format(
                    tax_name.split(" ")[0],
                    tax_name.split(" ")[1],
                    tax_name.split(" ")[2],
                )
                tax_id = names[names["name_txt"] == tax_name]["tax_id"].values[0]
            else:
                sys.stderr.write("ncbi taxon name unknown by parser files: {}, taxid set to 0.\n".format(tax_name))
                tax_id = 0
                if os.path.exists("ncbi_name_unknown.err"):
                    fn = open("ncbi_name_unknown.err", "a")
                    fn.write("{}".format(tax_id))
                    fn.close()
                else:
                    fn = open("ncbi_name_unknown.err", "w")
                    fn.write("{}".format(tax_id))
                    fn.close()
        return tax_id

RetroSearch is an open source project built by @garambo | Open a GitHub Issue

Search and Browse the WWW like it's 1997 | Search results from DuckDuckGo

HTML: 3.2 | Encoding: UTF-8 | Version: 0.7.4