#!/usr/bin/env python import argparse import logging import os import sys from itertools import islice from bioapi import MetageneDBCatalogGeneAPI, MetageneDBCatalogTaxonomyAPI from requests.exceptions import HTTPError from slugify import slugify from metagenedb.common.utils.parsers import IGCLineParser logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s') logger = logging.getLogger() class ImportIGCGenes(object): METAGENEDB_GENE_API = MetageneDBCatalogGeneAPI METAGENEDB_TAXONOMY_API = MetageneDBCatalogTaxonomyAPI PHYLUM_COL = 'taxo_phylum' GENUS_COL = 'taxo_genus' SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', PHYLUM_COL, GENUS_COL] def __init__(self, annotation_file, url, skip_tax=False, skip_functions=False): self.annotation_file = annotation_file self.url = url self.metagenedb_gene_api = self.METAGENEDB_GENE_API(base_url=self.url) self.metagenedb_taxonomy_api = self.METAGENEDB_TAXONOMY_API(base_url=self.url) self.total_genes = self._get_number_genes() self._reset_counters() # Skip some insertion if specified in script options self.skip_tax = skip_tax self.skip_functions = skip_functions def _reset_counters(self): self.processed_genes = 0 self.created_genes = 0 self.updated_genes = 0 self.skipped_genes = 0 def _get_number_genes(self): if not os.path.isfile(self.annotation_file): return 0 with open(self.annotation_file) as f: for i, l in enumerate(f): pass return i + 1 def _select_taxonomy(self, gene_dict, unknown_val='unknown'): """ Select the taxonomy to be assigned for the gene. genus has priority on phylum. If both unknow, remove the taxonomy key """ phylum = gene_dict.pop(self.PHYLUM_COL) genus = gene_dict.pop(self.GENUS_COL) if self.skip_tax: return gene_dict resp_dict = {} if genus != unknown_val: resp_dict = self.metagenedb_taxonomy_api.get_all(params={'name': genus, 'rank': 'genus'}) if len(resp_dict['results']) > 1: logger.warning(f"More than 1 result found for genus {genus}. First result is kept.") elif phylum != unknown_val: resp_dict = self.metagenedb_taxonomy_api.get_all(params={'name': phylum, 'rank': 'phylum'}) if len(resp_dict['results']) > 1: logger.warning(f"More than 1 result found for phylum {phylum}. First result is kept.") if resp_dict.get('count', 0) > 0: gene_dict.update({'taxonomy': resp_dict['results'][0]['tax_id']}) return gene_dict def _parse_gene(self, raw_line, selected_keys=SELECTED_KEYS): """ Use IGCLineParser and return selected keys """ gene_parser = IGCLineParser() all_dict = gene_parser.gene(raw_line) selected_dict = {k: v for k, v in all_dict.items() if k in selected_keys} return selected_dict def _clean_gene(self, gene_dict): gene_dict['gene_name'] = gene_dict['gene_id'] gene_dict['gene_id'] = slugify(gene_dict['gene_id']) gene_dict['functions'] = gene_dict.pop('kegg_ko') gene_dict = self._select_taxonomy(gene_dict) if self.skip_functions or 'unknown' in gene_dict['functions']: gene_dict.pop('functions') return gene_dict def load_annotation_file_to_db_in_chunks(self, chunk_size=1000, test=False): with open(self.annotation_file, 'r') as file: while True: chunk_genes = list(islice(file, chunk_size)) if not chunk_genes: break genes = [self._clean_gene(self._parse_gene(i)) for i in chunk_genes] try: response = self.metagenedb_gene_api.put(genes) self.created_genes += response.get('created').get('count') self.updated_genes += response.get('updated').get('count') except HTTPError as http_error: logging.warning("%s: %s; %s", http_error, http_error.response.json(), genes) self.skipped_genes += len(genes) self.processed_genes += len(chunk_genes) logger.info("%s Genes processed so far...", self.processed_genes) if test is True: break logger.info("[DONE] %s/%s Genes created.", self.created_genes, self.total_genes) logger.info("[DONE] %s/%s Genes updated.", self.updated_genes, self.total_genes) logger.info("[DONE] %s/%s Genes skipped.", self.skipped_genes, self.total_genes) def parse_arguments(): """ Defines parser. """ parser = argparse.ArgumentParser(description='Populate database from a given IGC annotation file.') # Common arguments for analysis and annotations parser.add_argument('annotation', help='IGC annotation file') parser.add_argument('--url', help='base URL of the instance.', default='http://localhost/') parser.add_argument('--chunk_size', type=int, default=1000, help='How many genes to handle and create in the same time.') parser.add_argument('--skip_taxonomy', action='store_true', help='Skip taxonomy information from genes.') parser.add_argument('--skip_functions', action='store_true', help='Skip functions information from genes.') parser.add_argument('--test', action='store_true', help='Run only on first chunk.') parser.add_argument('-v', '--verbose', action='store_true') try: return parser.parse_args() except SystemExit: sys.exit(1) def run(): args = parse_arguments() if args.verbose: logger.setLevel(logging.INFO) import_igc_genes = ImportIGCGenes(args.annotation, args.url, skip_tax=args.skip_taxonomy, skip_functions=args.skip_functions) import_igc_genes.load_annotation_file_to_db_in_chunks(chunk_size=args.chunk_size, test=args.test) if __name__ == "__main__": run()