#!/usr/bin/env python import argparse import logging import os import sys from itertools import islice import django from rest_framework.exceptions import ValidationError from metagenedb.common.utils.parsers import IGCLineParser # Before model import, we need to called django.setup() to Load apps os.environ.setdefault("DJANGO_SETTINGS_MODULE", "metagenedb.settings") django.setup() from metagenedb.apps.catalog.models import Gene, Function, Taxonomy # noqa from metagenedb.apps.catalog.serializers import GeneSerializer # noqa logging.basicConfig(level=logging.INFO) _LOGGER = logging.getLogger(__name__) PHYLUM_COL = 'taxo_phylum' GENUS_COL = 'taxo_genus' SELECTED_KEYS = ['gene_id', 'gene_length', 'kegg_ko', PHYLUM_COL, GENUS_COL] def parse_gene(raw_line, selected_keys=SELECTED_KEYS): """ Use IGCLineParser and return selected keys """ gene_parser = IGCLineParser() all_dict = gene_parser.gene(raw_line) selected_dict = {k: v for k, v in all_dict.items() if k in selected_keys} return selected_dict def select_taxonomy(gene_dict, unknown_val='unknown'): """ Select the taxonomy to be assigned for the gene. genus has priority on phylum. If both unknow, remove the taxonomy key """ phylum = gene_dict.pop(PHYLUM_COL) genus = gene_dict.pop(GENUS_COL) if genus != unknown_val: queryset = Taxonomy.objects.filter(name=genus, rank="genus") if queryset.count() > 1: _LOGGER.warning(f"More than 1 result found for genus {genus}. First result is kept.") gene_dict.update( {'taxonomy': queryset[0].tax_id} ) elif phylum != unknown_val: queryset = Taxonomy.objects.filter(name=phylum, rank="phylum") if queryset.count() > 1: _LOGGER.warning(f"More than 1 result found for phylum {phylum}. First result is kept.") gene_dict.update( {'taxonomy': queryset[0].tax_id} ) return gene_dict def upsert_gene(gene_dict): try: gene_obj = Gene.objects.get(gene_id=gene_dict.get('gene_id')) serializer = GeneSerializer(gene_obj, data=gene_dict) except Gene.DoesNotExist: serializer = GeneSerializer(data=gene_dict) serializer.is_valid(raise_exception=True) serializer.save() def insert_gene_list(chunk_genes): for gene_line in chunk_genes: gene_dict = parse_gene(gene_line) gene_dict_with_taxo = select_taxonomy(gene_dict) try: upsert_gene(gene_dict_with_taxo) except ValidationError as e: _LOGGER.warning(f"{e.__dict__} for gene_id: {gene_dict.get('gene_id')}. Insertion skipped.") def load_annotation_file_to_db_in_chunks(annotation_file, chunk_size=100000): processed_genes = 0 with open(annotation_file, 'r') as file: while True: chunk_genes = list(islice(file, chunk_size)) if not chunk_genes: break processed_genes += len(chunk_genes) insert_gene_list(chunk_genes) _LOGGER.info(f"{processed_genes} genes processed so far...") _LOGGER.info(f"[DONE] {processed_genes} genes processed.") def parse_arguments(): """ Defines parser. """ parser = argparse.ArgumentParser(description='Populate database from a given IGC annotation file.') # Common arguments for analysis and annotations parser.add_argument('annotation', help='IGC annotation file') parser.add_argument('--delete_all', action='store_true', help='Empty database before insertion.') try: return parser.parse_args() except SystemExit: sys.exit(1) def run(): args = parse_arguments() if args.delete_all: Gene.objects.all().delete() load_annotation_file_to_db_in_chunks(args.annotation) if __name__ == "__main__": run()