#!/usr/bin/env python import argparse import logging import os import sys from itertools import islice from requests.exceptions import HTTPError import django from rest_framework.exceptions import ValidationError from slugify import slugify from metagenedb.common.utils.api import MetageneDBCatalogGeneAPI from metagenedb.common.utils.parsers import IGCLineParser # Before model import, we need to called django.setup() to Load apps os.environ.setdefault("DJANGO_SETTINGS_MODULE", "metagenedb.settings") django.setup() from metagenedb.apps.catalog.models import Gene, Function, Taxonomy # noqa from metagenedb.apps.catalog.serializers import GeneSerializer # noqa logging.basicConfig(level=logging.INFO) _LOGGER = logging.getLogger(__name__) PHYLUM_COL = 'taxo_phylum' GENUS_COL = 'taxo_genus' SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', PHYLUM_COL, GENUS_COL] def parse_gene(raw_line, selected_keys=SELECTED_KEYS): """ Use IGCLineParser and return selected keys """ gene_parser = IGCLineParser() all_dict = gene_parser.gene(raw_line) selected_dict = {k: v for k, v in all_dict.items() if k in selected_keys} return selected_dict def select_taxonomy(gene_dict, unknown_val='unknown'): """ Select the taxonomy to be assigned for the gene. genus has priority on phylum. If both unknow, remove the taxonomy key """ phylum = gene_dict.pop(PHYLUM_COL) genus = gene_dict.pop(GENUS_COL) if genus != unknown_val: queryset = Taxonomy.objects.filter(name=genus, rank="genus") if queryset.count() > 1: _LOGGER.warning(f"More than 1 result found for genus {genus}. First result is kept.") gene_dict.update( {'taxonomy': queryset[0].tax_id} ) elif phylum != unknown_val: queryset = Taxonomy.objects.filter(name=phylum, rank="phylum") if queryset.count() > 1: _LOGGER.warning(f"More than 1 result found for phylum {phylum}. First result is kept.") gene_dict.update( {'taxonomy': queryset[0].tax_id} ) return gene_dict class ImportIGCGenes(object): METAGENEDB_GENE_API = MetageneDBCatalogGeneAPI def __init__(self, annotation_file, url, skip_tax=False, skip_functions=False): self.annotation_file = annotation_file self.url = url self.metagenedb_gene_api = self.METAGENEDB_GENE_API(base_url=self.url) # Skip some insertion if specified in script options self.skip_tax = skip_tax self.skip_functions = skip_functions def _clean_gene(self, gene_dict): gene_dict['gene_id'] = slugify(gene_dict['gene_id']) if self.skip_tax: gene_dict.pop('taxonomy') if self.skip_functions: gene_dict.pop('functions') return gene_dict def _upsert_gene(self, gene_dict): clean_gene_dict = self._clean_gene(gene_dict) try: gene_id = clean_gene_dict['gene_id'] self.metagenedb_gene_api.get(gene_id) # Try to get obj to check if it exists self.metagenedb_gene_api.put(gene_id, clean_gene_dict) except HTTPError: self.metagenedb_gene_api.post(clean_gene_dict) def _insert_gene_list(self, chunk_genes): for gene_line in chunk_genes: gene_dict = parse_gene(gene_line) gene_dict_with_taxo = select_taxonomy(gene_dict) try: self._upsert_gene(gene_dict_with_taxo) except ValidationError as e: _LOGGER.warning(f"{e.__dict__} for gene_id: {gene_dict.get('gene_id')}. Insertion skipped.") def load_annotation_file_to_db_in_chunks(self, chunk_size=100000): processed_genes = 0 with open(self.annotation_file, 'r') as file: while True: chunk_genes = list(islice(file, chunk_size)) if not chunk_genes: break processed_genes += len(chunk_genes) self._insert_gene_list(chunk_genes) _LOGGER.info(f"{processed_genes} genes processed so far...") _LOGGER.info(f"[DONE] {processed_genes} genes processed.") def parse_arguments(): """ Defines parser. """ parser = argparse.ArgumentParser(description='Populate database from a given IGC annotation file.') # Common arguments for analysis and annotations parser.add_argument('annotation', help='IGC annotation file') parser.add_argument('url', help='base URL of the instance.', default='http://localhost/') parser.add_argument('--skip_taxonomy', action='store_true', help='Skip taxonomy information from genes.') parser.add_argument('--skip_functions', action='store_true', help='Skip functions information from genes.') try: return parser.parse_args() except SystemExit: sys.exit(1) def run(): args = parse_arguments() load_annotation_file_to_db_in_chunks(args.annotation, args.url) if __name__ == "__main__": run()