#!/usr/bin/env python import argparse import logging import os import sys from itertools import islice from bioapi import ( MetageneDBCatalogFunctionAPI, MetageneDBCatalogGeneAPI, MetageneDBCatalogTaxonomyAPI, MetageneDBCatalogKeggOrthologyAPI, MetageneDBCatalogEggNogAPI, ) from requests.exceptions import HTTPError from slugify import slugify from metagenedb.common.utils.parsers import IGCLineParser logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s') logger = logging.getLogger() class ImportIGCGenes(object): METAGENEDB_GENE_API = MetageneDBCatalogGeneAPI METAGENEDB_TAXONOMY_API = MetageneDBCatalogTaxonomyAPI METAGENEDB_FUNCTION_API = MetageneDBCatalogFunctionAPI METAGENEDB_KEGG_API = MetageneDBCatalogKeggOrthologyAPI METAGENEDB_EGGNOG_API = MetageneDBCatalogEggNogAPI PHYLUM_COL = 'taxo_phylum' GENUS_COL = 'taxo_genus' SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', 'eggnog', PHYLUM_COL, GENUS_COL] def __init__(self, annotation_file, url, jwt_token, skip_tax=False, skip_functions=False): self.annotation_file = annotation_file self.url = url self._open_api_endpoints(jwt_token) self.total_genes = self._get_number_genes() self._reset_counters() # Skip some insertion if specified in script options self.skip_tax = skip_tax self.skip_functions = skip_functions def _reset_counters(self): self.processed_genes = 0 self.created_genes = 0 self.updated_genes = 0 self.skipped_genes = 0 def _open_api_endpoints(self, jwt_token): self.metagenedb_gene_api = self.METAGENEDB_GENE_API(base_url=self.url, jwt_token=jwt_token) self.metagenedb_taxonomy_api = self.METAGENEDB_TAXONOMY_API(base_url=self.url, jwt_token=jwt_token) self.metagenedb_function_api = self.METAGENEDB_FUNCTION_API(base_url=self.url, jwt_token=jwt_token) self.metagenedb_kegg_api = self.METAGENEDB_KEGG_API(base_url=self.url, jwt_token=jwt_token) self.metagenedb_eggnog_api = self.METAGENEDB_EGGNOG_API(base_url=self.url, jwt_token=jwt_token) def _build_taxo_mapping(self, rank, page_size=1000): logger.info("Building local mapping for %s level...", rank) counter = 1 next_page = None mapping = {} while counter == 1 or next_page is not None: params = { 'page': counter, 'page_size': page_size, 'rank': rank, } current_page = self.metagenedb_taxonomy_api.get_all(params=params) next_page = current_page['next'] mapping.update({ value['name']: value['tax_id'] for value in current_page['results'] }) counter += 1 return mapping def _retrieve_function_catalog(self, api, page_size=1000): logger.info("Building local catalog from %s...", api.ROUTE) counter = 1 next_page = None functions = set() while counter == 1 or next_page is not None: params = { 'page': counter, 'page_size': page_size, } current_page = api.get_all(params=params) next_page = current_page['next'] functions = functions.union(set( [item['function_id'] for item in current_page['results']] )) counter += 1 return functions def build_function_mappings(self, page_size=1000): self.metagenedb_keggs = self._retrieve_function_catalog(self.metagenedb_kegg_api, page_size=page_size) self.metagenedb_eggnogs = self._retrieve_function_catalog(self.metagenedb_eggnog_api, page_size=page_size) def build_mapping(self, page_size=1000): self.phylum_mapping = self._build_taxo_mapping("phylum", page_size=page_size) self.genus_mapping = self._build_taxo_mapping("genus", page_size=page_size) def _get_number_genes(self): if not os.path.isfile(self.annotation_file): return 0 with open(self.annotation_file) as f: for i, l in enumerate(f): pass return i + 1 def _select_taxonomy(self, gene_dict, unknown_val='unknown'): """ Select the taxonomy to be assigned for the gene. genus has priority on phylum. If both unknow, remove the taxonomy key """ phylum = gene_dict.pop(self.PHYLUM_COL) genus = gene_dict.pop(self.GENUS_COL) if self.skip_tax: return gene_dict taxonomy_id = None if genus != unknown_val: taxonomy_id = self.genus_mapping.get(genus, None) if taxonomy_id is None: logger.warning("No tax_id found for genus %s" % genus) elif phylum != unknown_val: taxonomy_id = self.phylum_mapping.get(phylum, None) if taxonomy_id is None: logger.warning("No tax_id found for phylum %s" % genus) if taxonomy_id is not None: gene_dict.update({'taxonomy': taxonomy_id}) return gene_dict def _parse_gene(self, raw_line, selected_keys=SELECTED_KEYS): """ Use IGCLineParser and return selected keys """ gene_parser = IGCLineParser() all_dict = gene_parser.gene(raw_line) selected_dict = {k: v for k, v in all_dict.items() if k in selected_keys} return selected_dict def _clean_functions(self, functions): clean_functions = [] for function in functions: if function['function_id'] in getattr(self, f"metagenedb_{function['source']}s"): clean_functions.append(function['function_id']) else: logger.warning("Function %s not found from %s in metagenedb", function['function_id'], function['source']) return clean_functions def _clean_gene(self, gene_dict): gene_dict['gene_name'] = gene_dict['gene_id'] gene_dict['gene_id'] = slugify(gene_dict['gene_id']) gene_dict['functions'] = [ {'source': 'kegg', 'function_id': v} for v in gene_dict.pop('kegg_ko') if v != 'unknown'] + \ [{'source': 'eggnog', 'function_id': v} for v in gene_dict.pop('eggnog') if v != 'unknown'] gene_dict = self._select_taxonomy(gene_dict) if self.skip_functions or not gene_dict['functions']: gene_dict.pop('functions') else: gene_dict['functions'] = self._clean_functions(gene_dict['functions']) return gene_dict def load_annotation_file_to_db_in_chunks(self, chunk_size=1000, test=False): if not self.skip_tax: self.build_mapping() if not self.skip_functions: self.build_function_mappings() with open(self.annotation_file, 'r') as file: while True: chunk_genes = list(islice(file, chunk_size)) if not chunk_genes: break genes = [self._clean_gene(self._parse_gene(i)) for i in chunk_genes] try: response = self.metagenedb_gene_api.put(genes) self.created_genes += response.get('created').get('count') self.updated_genes += response.get('updated').get('count') except HTTPError as http_error: logging.warning("%s: %s; %s", http_error, http_error.response.json(), genes) self.skipped_genes += len(genes) self.processed_genes += len(chunk_genes) logger.info("%s Genes processed so far...", self.processed_genes) if test is True: break logger.info("[DONE] %s/%s Genes created.", self.created_genes, self.total_genes) logger.info("[DONE] %s/%s Genes updated.", self.updated_genes, self.total_genes) logger.info("[DONE] %s/%s Genes skipped.", self.skipped_genes, self.total_genes) def parse_arguments(): """ Defines parser. """ parser = argparse.ArgumentParser(description='Populate database from a given IGC annotation file.') # Common arguments for analysis and annotations parser.add_argument('annotation', help='IGC annotation file') parser.add_argument('--url', help='base URL of the instance.', default='http://localhost/') parser.add_argument('-t', '--jwt_token', help='your JWT token obtain from web app', required=True) parser.add_argument('--chunk_size', type=int, default=1000, help='How many genes to handle and create in the same time.') parser.add_argument('--skip_taxonomy', action='store_true', help='Skip taxonomy information from genes.') parser.add_argument('--skip_functions', action='store_true', help='Skip functions information from genes.') parser.add_argument('--test', action='store_true', help='Run only on first chunk.') parser.add_argument('-v', '--verbose', action='store_true') try: return parser.parse_args() except SystemExit: sys.exit(1) def run(): args = parse_arguments() if args.verbose: logger.setLevel(logging.INFO) import_igc_genes = ImportIGCGenes(args.annotation, args.url, args.jwt_token, skip_tax=args.skip_taxonomy, skip_functions=args.skip_functions) import_igc_genes.load_annotation_file_to_db_in_chunks(chunk_size=args.chunk_size, test=args.test) if __name__ == "__main__": run()