Commit 9b1503a2 authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

update script to load taxonomy to go through API

parent 5cf966b4
# Metagenedb
[![pipeline status](https://gitlab.pasteur.fr/metagenomics/metagenedb/badges/master/pipeline.svg)](https://gitlab.pasteur.fr/metagenomics/metagenedb/commits/master)
[![coverage report](https://gitlab.pasteur.fr/metagenomics/metagenedb/badges/master/coverage.svg)](https://gitlab.pasteur.fr/metagenomics/metagenedb/commits/master)
[![pipeline status](https://gitlab.pasteur.fr/metagenomics/metagenedb/badges/master/pipeline.svg)](https://gitlab.pasteur.fr/metagenomics/metagenedb/commits/dev)
[![coverage report](https://gitlab.pasteur.fr/metagenomics/metagenedb/badges/master/coverage.svg)](https://gitlab.pasteur.fr/metagenomics/metagenedb/commits/dev)
Django based project to build genes catalog and tools
to play with it and contact external services.
......
#!/usr/bin/env python
import argparse
import logging
import os
import sys
from itertools import islice
import django
from bioapi import MetageneDBCatalogTaxonomyAPI
from metagenedb.common.utils.parsers import NCBITaxonomyLineParser
# Before model import, we need to called django.setup() to Load apps
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "metagenedb.settings")
django.setup()
from metagenedb.apps.catalog.models import Taxonomy # noqa
from metagenedb.apps.catalog.serializers import TaxonomySerializer # noqa
_LOGGER = logging.getLogger(__name__)
logging.basicConfig()
logger = logging.getLogger()
SELECT_RELATED_PARENT = "parent{}".format("__parent" * 40)
def import_names(taxonomy_names_file, select_class="scientific name"):
"""
Build and return a DICT {tax_id: taxe_name} for the chosen select_class
"""
_LOGGER.info(f"Importing {select_class} from {taxonomy_names_file}...")
taxo_name_dict = {}
with open(taxonomy_names_file, "r") as file:
for line in file:
if select_class in line:
name = NCBITaxonomyLineParser.name(line)
taxo_name_dict[name.get('tax_id')] = name.get('name_txt')
return taxo_name_dict
def create_taxo_nodes(taxonomy_nodes_file, taxo_name_dict):
_LOGGER.info(f"Create taxonomy objects from {taxonomy_nodes_file}...")
class ImportNCBITaxonomy(object):
METAGENEDB_TAX_API = MetageneDBCatalogTaxonomyAPI
FOREIGN_KEY_FIELDS = ['parent_tax_id']
with open(taxonomy_nodes_file, "r") as file:
cpt = 0
for i in file:
node = NCBITaxonomyLineParser.node(i)
node['name'] = taxo_name_dict.get(node.get('tax_id'), "No name")
for key in FOREIGN_KEY_FIELDS:
del node[key]
serializer = TaxonomySerializer(data=node)
if serializer.is_valid():
serializer.save()
cpt += 1
else:
_LOGGER.warning(f"Invalid data: {serializer.errors}. Insertion skipped. Data: {serializer.data}")
if cpt % 10000 == 0:
_LOGGER.info(f"{cpt} taxonomies created...")
_LOGGER.info(f"[DONE] {cpt} taxonomies created.")
def update_taxo_nodes(taxonomy_nodes_file):
_LOGGER.info(f"Linking taxonomy objects to direct parental node from {taxonomy_nodes_file}...")
all_taxo_count = Taxonomy.objects.select_related(SELECT_RELATED_PARENT).all().count()
with open(taxonomy_nodes_file, "r") as file:
cpt = 0
for i in file:
node = NCBITaxonomyLineParser.node(i)
taxo_obj = Taxonomy.objects.get(tax_id=node.get('tax_id'))
serializer = TaxonomySerializer(taxo_obj, data=node)
if serializer.is_valid():
serializer.save()
cpt += 1
else:
_LOGGER.warning(f"Invalid data: {serializer.errors}. Link to parent skipped. Data: {serializer.data}")
if cpt % 10000 == 0:
_LOGGER.info(f"{cpt}/{all_taxo_count} taxonomies updated...")
_LOGGER.info(f"[DONE] {cpt}/{all_taxo_count} taxonomies updated.")
def __init__(self, url, tax_names_file, tax_nodes_file):
self.metagenedb_tax_api = self.METAGENEDB_TAX_API(base_url=url)
self.tax_names_file = tax_names_file
self.tax_nodes_file = tax_nodes_file
self.total_tax = self._get_number_nodes()
self._reset_counters()
def _reset_counters(self):
self.processed_tax = 0
self.created_tax = 0
self.updated_tax = 0
self.skipped_tax = 0
def _get_number_nodes(self):
with open(self.tax_nodes_file) as f:
for i, l in enumerate(f):
pass
return i + 1
def import_names(self, select_class="scientific name"):
"""
Build and return a DICT {tax_id: taxe_name} for the chosen select_class
"""
logger.info("Importing %s from %s...", select_class, self.tax_names_file)
taxo_name_dict = {}
with open(self.tax_names_file, "r") as file:
for line in file:
if select_class in line:
name = NCBITaxonomyLineParser.name(line)
taxo_name_dict[name.get('tax_id')] = name.get('name_txt')
return taxo_name_dict
def _process_nodes_for_creation(self, nodes, taxo_name_dict):
for node in nodes:
node['name'] = taxo_name_dict.get(node['tax_id'], "No name")
for key in self.FOREIGN_KEY_FIELDS:
del node[key]
return nodes
def create_taxo_nodes(self, taxo_name_dict, chunk_size=1000):
logger.info("Create taxonomy objects from %s...", self.tax_nodes_file)
with open(self.tax_nodes_file, "r") as f:
while True:
next_nodes = list(islice(f, chunk_size))
if not next_nodes:
break
nodes = [NCBITaxonomyLineParser.node(i) for i in next_nodes]
nodes = self._process_nodes_for_creation(nodes, taxo_name_dict)
response = self.metagenedb_tax_api.put(nodes)
self.created_tax += response.get('created').get('count')
self.updated_tax += response.get('updated').get('count')
self.processed_tax += len(nodes)
logger.info("%s/%s Taxonomy processed so far...", self.processed_tax, self.total_tax)
break
logger.info("[DONE] %s/%s Taxonomy created.", self.created_tax, self.total_tax)
logger.info("[DONE] %s/%s Taxonomy updated.", self.updated_tax, self.total_tax)
logger.info("[DONE] %s/%s Taxonomy skipped.", self.skipped_tax, self.total_tax)
def update_taxo_nodes(self, chunk_size=1000):
self._reset_counters()
logger.info(f"Linking taxonomy objects to direct parental node from %s...", self.tax_nodes_file)
with open(self.tax_nodes_file, "r") as f:
while True:
next_nodes = list(islice(f, chunk_size))
if not next_nodes:
break
nodes = [NCBITaxonomyLineParser.node(i) for i in next_nodes]
response = self.metagenedb_tax_api.put(nodes)
self.created_tax += response.get('created').get('count')
self.updated_tax += response.get('updated').get('count')
self.processed_tax += len(nodes)
logger.info("%s/%s Taxonomy processed so far...", self.processed_tax, self.total_tax)
break
logger.info("[DONE] %s/%s Taxonomy created.", self.created_tax, self.total_tax)
logger.info("[DONE] %s/%s Taxonomy updated.", self.updated_tax, self.total_tax)
logger.info("[DONE] %s/%s Taxonomy skipped.", self.skipped_tax, self.total_tax)
"""
_build_hierarchy and build_all_hierarchy need to be moved and executed through a specific endpoint
It will be much faster to build all the hierarchy from the backend server directly.
def _build_hierarchy(taxo):
hierarchy = taxo.build_parental_hierarchy()
......@@ -82,23 +108,25 @@ def _build_hierarchy(taxo):
if serializer.is_valid():
serializer.save()
else:
_LOGGER.warning(f"Invalid data: {serializer.errors}. Building hierarchy skipped. Data: {serializer.data}")
logger.warning(f"Invalid data: {serializer.errors}. Building hierarchy skipped. Data: {serializer.data}")
def build_all_hierarchy(chunk_size=8000):
"""
'''
Uses class method from Taxonomy model to retrieve the parental hierarchy and
assign corresponding attribute to each entry.
"""
_LOGGER.info(f"Linking taxonomy objects to parental nodes from direct parental nodes...")
'''
logger.info(f"Linking taxonomy objects to parental nodes from direct parental nodes...")
all_taxo = Taxonomy.objects.select_related(SELECT_RELATED_PARENT).all()
cpt = 0
for taxo in all_taxo.iterator(chunk_size=chunk_size):
_build_hierarchy(taxo)
cpt += 1
if cpt % 10000 == 0:
_LOGGER.info(f"{cpt}/{all_taxo.count()} hierachies built...")
_LOGGER.info(f"[DONE] {cpt}/{all_taxo.count()} hierachies built.")
logger.info(f"{cpt}/{all_taxo.count()} hierachies built...")
logger.info(f"[DONE] {cpt}/{all_taxo.count()} hierachies built.")
"""
def parse_arguments():
......@@ -109,6 +137,8 @@ def parse_arguments():
# Common arguments for analysis and annotations
parser.add_argument('--nodes', help='nodes.dmp file from ncbi_taxonomy', required=True)
parser.add_argument('--names', help='names.dmp file from ncbi_taxonomy', required=True)
parser.add_argument('--url', help='base URL of the instance.', default='http://localhost/')
parser.add_argument('-v', '--verbose', action='store_true')
try:
return parser.parse_args()
......@@ -118,10 +148,12 @@ def parse_arguments():
def run():
args = parse_arguments()
taxonomy_names = import_names(args.names)
create_taxo_nodes(args.nodes, taxonomy_names)
update_taxo_nodes(args.nodes)
build_all_hierarchy()
if args.verbose:
logger.setLevel(logging.INFO)
import_ncbi_tax = ImportNCBITaxonomy(args.url, args.names, args.nodes)
taxonomy_names = import_ncbi_tax.import_names()
import_ncbi_tax.create_taxo_nodes(taxonomy_names)
import_ncbi_tax.update_taxo_nodes()
if __name__ == "__main__":
......
......@@ -4,7 +4,6 @@ import logging
import requests
import sys
import time
from requests.exceptions import HTTPError
from bioapi import MetageneDBCatalogFunctionAPI
......@@ -28,13 +27,6 @@ class ImportKEGGKO(object):
self.updated_kegg = 0
self.skipped_kegg = 0
def _upsert_kegg_ko(self, kegg_ko):
try:
self.metagenedb_function_api.get(kegg_ko.get('function_id')) # Try to get obj to check if it exists
self.metagenedb_function_api.put(kegg_ko.get('function_id'), kegg_ko)
except HTTPError:
self.metagenedb_function_api.post(kegg_ko)
def load_all_kegg_ko(self, chunk_size=1000):
all_ko_response = requests.get(self.kegg_ko_list_api)
all_ko_response.raise_for_status()
......@@ -46,11 +38,11 @@ class ImportKEGGKO(object):
self.created_kegg += response.get('created').get('count')
self.updated_kegg += response.get('updated').get('count')
self.processed_kegg += len(ko_chunk)
logger.info(f"%s/%s KEGG KO processed so far...", self.processed_kegg, self.total_kegg_nb)
logger.info("%s/%s KEGG KO processed so far...", self.processed_kegg, self.total_kegg_nb)
time.sleep(1)
logger.info(f"[DONE] %s/%s KEGG KO created.", self.created_kegg, self.total_kegg_nb)
logger.info(f"[DONE] %s/%s KEGG KO updated.", self.updated_kegg, self.total_kegg_nb)
logger.info(f"[DONE] %s/%s KEGG KO skipped.", self.skipped_kegg, self.total_kegg_nb)
logger.info("[DONE] %s/%s KEGG KO created.", self.created_kegg, self.total_kegg_nb)
logger.info("[DONE] %s/%s KEGG KO updated.", self.updated_kegg, self.total_kegg_nb)
logger.info("[DONE] %s/%s KEGG KO skipped.", self.skipped_kegg, self.total_kegg_nb)
def parse_arguments():
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment