import_ncbi_taxonomy.py 5.27 KB
Newer Older
1
2
3
4
#!/usr/bin/env python
import argparse
import logging
import sys
5
from itertools import islice
6

7
from dabeplech import MetageneDBCatalogTaxonomyAPI
8

9
from metagenedb.common.utils.parsers import NCBITaxonomyLineParser
10

11
12
logging.basicConfig()
logger = logging.getLogger()
13

14
15
SELECT_RELATED_PARENT = "parent{}".format("__parent" * 40)

16

17
18
class ImportNCBITaxonomy(object):
    METAGENEDB_TAX_API = MetageneDBCatalogTaxonomyAPI
19
    FOREIGN_KEY_FIELDS = ['parent_tax_id']
20

21
22
    def __init__(self, url, jwt_token, tax_names_file, tax_nodes_file):
        self.metagenedb_tax_api = self.METAGENEDB_TAX_API(base_url=url, jwt_token=jwt_token)
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
        self.tax_names_file = tax_names_file
        self.tax_nodes_file = tax_nodes_file
        self.total_tax = self._get_number_nodes()
        self._reset_counters()

    def _reset_counters(self):
        self.processed_tax = 0
        self.created_tax = 0
        self.updated_tax = 0
        self.skipped_tax = 0

    def _get_number_nodes(self):
        with open(self.tax_nodes_file) as f:
            for i, l in enumerate(f):
                pass
        return i + 1

    def import_names(self, select_class="scientific name"):
        """
        Build and return a DICT {tax_id: taxe_name} for the chosen select_class
        """
        logger.info("Importing %s from %s...", select_class, self.tax_names_file)
        taxo_name_dict = {}
        with open(self.tax_names_file, "r") as file:
            for line in file:
                if select_class in line:
                    name = NCBITaxonomyLineParser.name(line)
                    taxo_name_dict[name.get('tax_id')] = name.get('name_txt')
        return taxo_name_dict

    def _process_nodes_for_creation(self, nodes, taxo_name_dict):
        for node in nodes:
            node['name'] = taxo_name_dict.get(node['tax_id'], "No name")
            for key in self.FOREIGN_KEY_FIELDS:
                del node[key]
        return nodes

    def create_taxo_nodes(self, taxo_name_dict, chunk_size=1000):
        logger.info("Create taxonomy objects from %s...", self.tax_nodes_file)
        with open(self.tax_nodes_file, "r") as f:
            while True:
                next_nodes = list(islice(f, chunk_size))
                if not next_nodes:
                    break
                nodes = [NCBITaxonomyLineParser.node(i) for i in next_nodes]
                nodes = self._process_nodes_for_creation(nodes, taxo_name_dict)
                response = self.metagenedb_tax_api.put(nodes)
                self.created_tax += response.get('created').get('count')
                self.updated_tax += response.get('updated').get('count')
                self.processed_tax += len(nodes)
                logger.info("%s/%s Taxonomy processed so far...", self.processed_tax, self.total_tax)
        logger.info("[DONE] %s/%s Taxonomy created.", self.created_tax, self.total_tax)
        logger.info("[DONE] %s/%s Taxonomy updated.", self.updated_tax, self.total_tax)
        logger.info("[DONE] %s/%s Taxonomy skipped.", self.skipped_tax, self.total_tax)

    def update_taxo_nodes(self, chunk_size=1000):
        self._reset_counters()
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
80
        logger.info("Linking taxonomy objects to direct parental node from %s...", self.tax_nodes_file)
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
        with open(self.tax_nodes_file, "r") as f:
            while True:
                next_nodes = list(islice(f, chunk_size))
                if not next_nodes:
                    break
                nodes = [NCBITaxonomyLineParser.node(i) for i in next_nodes]
                response = self.metagenedb_tax_api.put(nodes)
                self.created_tax += response.get('created').get('count')
                self.updated_tax += response.get('updated').get('count')
                self.processed_tax += len(nodes)
                logger.info("%s/%s Taxonomy processed so far...", self.processed_tax, self.total_tax)
        logger.info("[DONE] %s/%s Taxonomy created.", self.created_tax, self.total_tax)
        logger.info("[DONE] %s/%s Taxonomy updated.", self.updated_tax, self.total_tax)
        logger.info("[DONE] %s/%s Taxonomy skipped.", self.skipped_tax, self.total_tax)

96
97
98
99
100
101
102
103
104

def parse_arguments():
    """
    Defines parser.
    """
    parser = argparse.ArgumentParser(description='Populate database from a given NCBI taxonomy files.')
    # Common arguments for analysis and annotations
    parser.add_argument('--nodes', help='nodes.dmp file from ncbi_taxonomy', required=True)
    parser.add_argument('--names', help='names.dmp file from ncbi_taxonomy', required=True)
105
    parser.add_argument('--skip_creation', action='store_true', help='Skip taxonomy creation.')
106
    parser.add_argument('--url', help='base URL of the instance.', default='http://localhost/')
107
    parser.add_argument('-t', '--jwt_token', help='your JWT token obtain from web app', required=True)
108
    parser.add_argument('-v', '--verbose', action='store_true')
109
110
111
112
113
114
115
116
117

    try:
        return parser.parse_args()
    except SystemExit:
        sys.exit(1)


def run():
    args = parse_arguments()
118
119
    if args.verbose:
        logger.setLevel(logging.INFO)
120
    import_ncbi_tax = ImportNCBITaxonomy(args.url, args.jwt_token, args.names, args.nodes)
121
    taxonomy_names = import_ncbi_tax.import_names()
122
123
124
    if not args.skip_creation:
        import_ncbi_tax.create_taxo_nodes(taxonomy_names)
        import_ncbi_tax.update_taxo_nodes()
125
126
127
128


if __name__ == "__main__":
    run()