Skip to content
Snippets Groups Projects
Select Git revision
  • db4cb890321024ad64dc81f653bdc1940208a128
  • dev default
  • improve-source
  • improve-db-queries
  • master protected
5 results

import_igc_data.py

Blame
  • import_igc_data.py 9.34 KiB
    #!/usr/bin/env python
    import argparse
    import logging
    import os
    import sys
    from itertools import islice
    
    from bioapi import (
        MetageneDBCatalogFunctionAPI,
        MetageneDBCatalogGeneAPI,
        MetageneDBCatalogTaxonomyAPI,
        MetageneDBCatalogKeggOrthologyAPI,
        MetageneDBCatalogEggNogAPI
    )
    from requests.exceptions import HTTPError
    from slugify import slugify
    
    from metagenedb.common.utils.parsers import IGCLineParser
    
    logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
    logger = logging.getLogger()
    
    
    class ImportIGCGenes(object):
        METAGENEDB_GENE_API = MetageneDBCatalogGeneAPI
        METAGENEDB_TAXONOMY_API = MetageneDBCatalogTaxonomyAPI
        METAGENEDB_FUNCTION_API = MetageneDBCatalogFunctionAPI
        METAGENEDB_KEGG_API = MetageneDBCatalogKeggOrthologyAPI
        METAGENEDB_EGGNOG_API = MetageneDBCatalogEggNogAPI
    
        PHYLUM_COL = 'taxo_phylum'
        GENUS_COL = 'taxo_genus'
        SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', 'eggnog', PHYLUM_COL, GENUS_COL]
    
        def __init__(self, annotation_file, url, jwt_token, skip_tax=False, skip_functions=False):
            self.annotation_file = annotation_file
            self.url = url
            self._open_api_endpoints(jwt_token)
            self.total_genes = self._get_number_genes()
            self._reset_counters()
            # Skip some insertion if specified in script options
            self.skip_tax = skip_tax
            self.skip_functions = skip_functions
    
        def _reset_counters(self):
            self.processed_genes = 0
            self.created_genes = 0
            self.updated_genes = 0
            self.skipped_genes = 0
    
        def _open_api_endpoints(self, jwt_token):
            self.metagenedb_gene_api = self.METAGENEDB_GENE_API(base_url=self.url, jwt_token=jwt_token)
            self.metagenedb_taxonomy_api = self.METAGENEDB_TAXONOMY_API(base_url=self.url, jwt_token=jwt_token)
            self.metagenedb_function_api = self.METAGENEDB_FUNCTION_API(base_url=self.url, jwt_token=jwt_token)
            self.metagenedb_kegg_api = self.METAGENEDB_KEGG_API(base_url=self.url, jwt_token=jwt_token)
            self.metagenedb_eggnog_api = self.METAGENEDB_EGGNOG_API(base_url=self.url, jwt_token=jwt_token)
    
        def _build_taxo_mapping(self, rank, page_size=1000):
            logger.info("Building local mapping for %s level...", rank)
            counter = 1
            next_page = None
            mapping = {}
            while counter == 1 or next_page is not None:
                params = {
                    'page': counter,
                    'page_size': page_size,
                    'rank': rank,
                }
                current_page = self.metagenedb_taxonomy_api.get_all(params=params)
                next_page = current_page['next']
                mapping.update({
                    value['name']: value['tax_id'] for value in current_page['results']
                })
                counter += 1
            return mapping
    
        def _retrieve_function_catalog(self, api, page_size=1000):
            logger.info("Building local catalog from %s...", api.ROUTE)
            counter = 1
            next_page = None
            functions = set()
            while counter == 1 or next_page is not None:
                params = {
                    'page': counter,
                    'page_size': page_size,
                }
                current_page = api.get_all(params=params)
                next_page = current_page['next']
                functions = functions.union(set(
                    [item['function_id'] for item in current_page['results']]
                ))
                counter += 1
            return functions
    
        def build_function_mappings(self, page_size=1000):
            self.metagenedb_keggs = self._retrieve_function_catalog(self.metagenedb_kegg_api, page_size=page_size)
            self.metagenedb_eggnogs = self._retrieve_function_catalog(self.metagenedb_eggnog_api, page_size=page_size)
    
        def build_mapping(self, page_size=1000):
            self.phylum_mapping = self._build_taxo_mapping("phylum", page_size=page_size)
            self.genus_mapping = self._build_taxo_mapping("genus", page_size=page_size)
    
        def _get_number_genes(self):
            if not os.path.isfile(self.annotation_file):
                return 0
            with open(self.annotation_file) as f:
                for i, l in enumerate(f):
                    pass
            return i + 1
    
        def _select_taxonomy(self, gene_dict, unknown_val='unknown'):
            """
            Select the taxonomy to be assigned for the gene.
            genus has priority on phylum. If both unknow, remove the taxonomy key
            """
            phylum = gene_dict.pop(self.PHYLUM_COL)
            genus = gene_dict.pop(self.GENUS_COL)
            if self.skip_tax:
                return gene_dict
            taxonomy_id = None
            if genus != unknown_val:
                taxonomy_id = self.genus_mapping.get(genus, None)
                if taxonomy_id is None:
                    logger.warning("No tax_id found for genus %s" % genus)
            elif phylum != unknown_val:
                taxonomy_id = self.phylum_mapping.get(phylum, None)
                if taxonomy_id is None:
                    logger.warning("No tax_id found for phylum %s" % genus)
            if taxonomy_id is not None:
                gene_dict.update({'taxonomy': taxonomy_id})
            return gene_dict
    
        def _parse_gene(self, raw_line, selected_keys=SELECTED_KEYS):
            """
            Use IGCLineParser and return selected keys
            """
            gene_parser = IGCLineParser()
            all_dict = gene_parser.gene(raw_line)
            selected_dict = {k: v for k, v in all_dict.items() if k in selected_keys}
            return selected_dict
    
        def _clean_functions(self, functions):
            clean_functions = []
            for function in functions:
                if function['function_id'] in getattr(self, f"metagenedb_{function['source']}s"):
                    clean_functions.append(function['function_id'])
                else:
                    logger.warning("Function %s not found from %s in metagenedb",
                                   function['function_id'], function['source'])
            return clean_functions
    
        def _clean_gene(self, gene_dict):
            gene_dict['gene_name'] = gene_dict['gene_id']
            gene_dict['gene_id'] = slugify(gene_dict['gene_id'])
            gene_dict['functions'] = [
                {'source': 'kegg', 'function_id': v} for v in gene_dict.pop('kegg_ko') if v != 'unknown'] + \
                [{'source': 'eggnog', 'function_id': v} for v in gene_dict.pop('eggnog') if v != 'unknown']
            gene_dict = self._select_taxonomy(gene_dict)
            if self.skip_functions or not gene_dict['functions']:
                gene_dict.pop('functions')
            else:
                gene_dict['functions'] = self._clean_functions(gene_dict['functions'])
            return gene_dict
    
        def load_annotation_file_to_db_in_chunks(self, chunk_size=1000, test=False):
            if not self.skip_tax:
                self.build_mapping()
            if not self.skip_functions:
                self.build_function_mappings()
            with open(self.annotation_file, 'r') as file:
                while True:
                    chunk_genes = list(islice(file, chunk_size))
                    if not chunk_genes:
                        break
                    genes = [self._clean_gene(self._parse_gene(i)) for i in chunk_genes]
                    try:
                        response = self.metagenedb_gene_api.put(genes)
                        self.created_genes += response.get('created').get('count')
                        self.updated_genes += response.get('updated').get('count')
                    except HTTPError as http_error:
                        logging.warning("%s: %s; %s", http_error, http_error.response.json(), genes)
                        self.skipped_genes += len(genes)
                    self.processed_genes += len(chunk_genes)
                    logger.info("%s Genes processed so far...", self.processed_genes)
                    if test is True:
                        break
            logger.info("[DONE] %s/%s Genes created.", self.created_genes, self.total_genes)
            logger.info("[DONE] %s/%s Genes updated.", self.updated_genes, self.total_genes)
            logger.info("[DONE] %s/%s Genes skipped.", self.skipped_genes, self.total_genes)
    
    
    def parse_arguments():
        """
        Defines parser.
        """
        parser = argparse.ArgumentParser(description='Populate database from a given IGC annotation file.')
        # Common arguments for analysis and annotations
        parser.add_argument('annotation', help='IGC annotation file')
        parser.add_argument('--url', help='base URL of the instance.', default='http://localhost/')
        parser.add_argument('-t', '--jwt_token', help='your JWT token obtain from web app', required=True)
        parser.add_argument('--chunk_size', type=int, default=1000,
                            help='How many genes to handle and create in the same time.')
        parser.add_argument('--skip_taxonomy', action='store_true', help='Skip taxonomy information from genes.')
        parser.add_argument('--skip_functions', action='store_true', help='Skip functions information from genes.')
        parser.add_argument('--test', action='store_true', help='Run only on first chunk.')
        parser.add_argument('-v', '--verbose', action='store_true')
    
        try:
            return parser.parse_args()
        except SystemExit:
            sys.exit(1)
    
    
    def run():
        args = parse_arguments()
        if args.verbose:
            logger.setLevel(logging.INFO)
        import_igc_genes = ImportIGCGenes(args.annotation, args.url, args.jwt_token,
                                          skip_tax=args.skip_taxonomy, skip_functions=args.skip_functions)
        import_igc_genes.load_annotation_file_to_db_in_chunks(chunk_size=args.chunk_size, test=args.test)
    
    
    if __name__ == "__main__":
        run()