Skip to content
Snippets Groups Projects
Select Git revision
  • 7d8535532a67c170db81ccb8717c1c96bc52e8a0
  • dev default
  • improve-source
  • improve-db-queries
  • master protected
5 results

import_igc_data.py

Blame
  • import_igc_data.py 5.99 KiB
    #!/usr/bin/env python
    import argparse
    import logging
    import os
    import sys
    from itertools import islice
    
    from bioapi import MetageneDBCatalogGeneAPI, MetageneDBCatalogTaxonomyAPI
    from requests.exceptions import HTTPError
    from slugify import slugify
    
    from metagenedb.common.utils.parsers import IGCLineParser
    
    logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
    logger = logging.getLogger()
    
    
    class ImportIGCGenes(object):
        METAGENEDB_GENE_API = MetageneDBCatalogGeneAPI
        METAGENEDB_TAXONOMY_API = MetageneDBCatalogTaxonomyAPI
    
        PHYLUM_COL = 'taxo_phylum'
        GENUS_COL = 'taxo_genus'
        SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', PHYLUM_COL, GENUS_COL]
    
        def __init__(self, annotation_file, url, skip_tax=False, skip_functions=False):
            self.annotation_file = annotation_file
            self.url = url
            self.metagenedb_gene_api = self.METAGENEDB_GENE_API(base_url=self.url)
            self.metagenedb_taxonomy_api = self.METAGENEDB_TAXONOMY_API(base_url=self.url)
            self.total_genes = self._get_number_genes()
            self._reset_counters()
            # Skip some insertion if specified in script options
            self.skip_tax = skip_tax
            self.skip_functions = skip_functions
    
        def _reset_counters(self):
            self.processed_genes = 0
            self.created_genes = 0
            self.updated_genes = 0
            self.skipped_genes = 0
    
        def _get_number_genes(self):
            if not os.path.isfile(self.annotation_file):
                return 0
            with open(self.annotation_file) as f:
                for i, l in enumerate(f):
                    pass
            return i + 1
    
        def _select_taxonomy(self, gene_dict, unknown_val='unknown'):
            """
            Select the taxonomy to be assigned for the gene.
            genus has priority on phylum. If both unknow, remove the taxonomy key
            """
            phylum = gene_dict.pop(self.PHYLUM_COL)
            genus = gene_dict.pop(self.GENUS_COL)
            if self.skip_tax:
                return gene_dict
            resp_dict = {}
            if genus != unknown_val:
                resp_dict = self.metagenedb_taxonomy_api.get_all(params={'name': genus, 'rank': 'genus'})
                if len(resp_dict['results']) > 1:
                    logger.warning(f"More than 1 result found for genus {genus}. First result is kept.")
            elif phylum != unknown_val:
                resp_dict = self.metagenedb_taxonomy_api.get_all(params={'name': phylum, 'rank': 'phylum'})
                if len(resp_dict['results']) > 1:
                    logger.warning(f"More than 1 result found for phylum {phylum}. First result is kept.")
            if resp_dict.get('count', 0) > 0:
                gene_dict.update({'taxonomy': resp_dict['results'][0]['tax_id']})
            return gene_dict
    
        def _parse_gene(self, raw_line, selected_keys=SELECTED_KEYS):
            """
            Use IGCLineParser and return selected keys
            """
            gene_parser = IGCLineParser()
            all_dict = gene_parser.gene(raw_line)
            selected_dict = {k: v for k, v in all_dict.items() if k in selected_keys}
            return selected_dict
    
        def _clean_gene(self, gene_dict):
            gene_dict['gene_name'] = gene_dict['gene_id']
            gene_dict['gene_id'] = slugify(gene_dict['gene_id'])
            gene_dict['functions'] = gene_dict.pop('kegg_ko')
            gene_dict = self._select_taxonomy(gene_dict)
            if self.skip_functions or 'unknown' in gene_dict['functions']:
                gene_dict.pop('functions')
            return gene_dict
    
        def load_annotation_file_to_db_in_chunks(self, chunk_size=1000, test=False):
            with open(self.annotation_file, 'r') as file:
                while True:
                    chunk_genes = list(islice(file, chunk_size))
                    if not chunk_genes:
                        break
                    genes = [self._clean_gene(self._parse_gene(i)) for i in chunk_genes]
                    try:
                        response = self.metagenedb_gene_api.put(genes)
                        self.created_genes += response.get('created').get('count')
                        self.updated_genes += response.get('updated').get('count')
                    except HTTPError as http_error:
                        logging.warning("%s: %s; %s", http_error, http_error.response.json(), genes)
                        self.skipped_genes += len(genes)
                    self.processed_genes += len(chunk_genes)
                    logger.info("%s Genes processed so far...", self.processed_genes)
                    if test is True:
                        break
            logger.info("[DONE] %s/%s Genes created.", self.created_genes, self.total_genes)
            logger.info("[DONE] %s/%s Genes updated.", self.updated_genes, self.total_genes)
            logger.info("[DONE] %s/%s Genes skipped.", self.skipped_genes, self.total_genes)
    
    
    def parse_arguments():
        """
        Defines parser.
        """
        parser = argparse.ArgumentParser(description='Populate database from a given IGC annotation file.')
        # Common arguments for analysis and annotations
        parser.add_argument('annotation', help='IGC annotation file')
        parser.add_argument('--url', help='base URL of the instance.', default='http://localhost/')
        parser.add_argument('--chunk_size', type=int, default=1000, help='How many genes to handle and create in the same time.')
        parser.add_argument('--skip_taxonomy', action='store_true', help='Skip taxonomy information from genes.')
        parser.add_argument('--skip_functions', action='store_true', help='Skip functions information from genes.')
        parser.add_argument('--test', action='store_true', help='Run only on first chunk.')
        parser.add_argument('-v', '--verbose', action='store_true')
    
        try:
            return parser.parse_args()
        except SystemExit:
            sys.exit(1)
    
    
    def run():
        args = parse_arguments()
        if args.verbose:
            logger.setLevel(logging.INFO)
        import_igc_genes = ImportIGCGenes(args.annotation, args.url,
                                          skip_tax=args.skip_taxonomy, skip_functions=args.skip_functions)
        import_igc_genes.load_annotation_file_to_db_in_chunks(chunk_size=args.chunk_size, test=args.test)
    
    
    if __name__ == "__main__":
        run()