Select Git revision
import_igc_data.py
Kenzo-Hugo Hillion authored
import_igc_data.py 9.34 KiB
#!/usr/bin/env python
import argparse
import logging
import os
import sys
from itertools import islice
from bioapi import (
MetageneDBCatalogFunctionAPI,
MetageneDBCatalogGeneAPI,
MetageneDBCatalogTaxonomyAPI,
MetageneDBCatalogKeggOrthologyAPI,
MetageneDBCatalogEggNogAPI
)
from requests.exceptions import HTTPError
from slugify import slugify
from metagenedb.common.utils.parsers import IGCLineParser
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger()
class ImportIGCGenes(object):
METAGENEDB_GENE_API = MetageneDBCatalogGeneAPI
METAGENEDB_TAXONOMY_API = MetageneDBCatalogTaxonomyAPI
METAGENEDB_FUNCTION_API = MetageneDBCatalogFunctionAPI
METAGENEDB_KEGG_API = MetageneDBCatalogKeggOrthologyAPI
METAGENEDB_EGGNOG_API = MetageneDBCatalogEggNogAPI
PHYLUM_COL = 'taxo_phylum'
GENUS_COL = 'taxo_genus'
SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', 'eggnog', PHYLUM_COL, GENUS_COL]
def __init__(self, annotation_file, url, jwt_token, skip_tax=False, skip_functions=False):
self.annotation_file = annotation_file
self.url = url
self._open_api_endpoints(jwt_token)
self.total_genes = self._get_number_genes()
self._reset_counters()
# Skip some insertion if specified in script options
self.skip_tax = skip_tax
self.skip_functions = skip_functions
def _reset_counters(self):
self.processed_genes = 0
self.created_genes = 0
self.updated_genes = 0
self.skipped_genes = 0
def _open_api_endpoints(self, jwt_token):
self.metagenedb_gene_api = self.METAGENEDB_GENE_API(base_url=self.url, jwt_token=jwt_token)
self.metagenedb_taxonomy_api = self.METAGENEDB_TAXONOMY_API(base_url=self.url, jwt_token=jwt_token)
self.metagenedb_function_api = self.METAGENEDB_FUNCTION_API(base_url=self.url, jwt_token=jwt_token)
self.metagenedb_kegg_api = self.METAGENEDB_KEGG_API(base_url=self.url, jwt_token=jwt_token)
self.metagenedb_eggnog_api = self.METAGENEDB_EGGNOG_API(base_url=self.url, jwt_token=jwt_token)
def _build_taxo_mapping(self, rank, page_size=1000):
logger.info("Building local mapping for %s level...", rank)
counter = 1
next_page = None
mapping = {}
while counter == 1 or next_page is not None:
params = {
'page': counter,
'page_size': page_size,
'rank': rank,
}
current_page = self.metagenedb_taxonomy_api.get_all(params=params)
next_page = current_page['next']
mapping.update({
value['name']: value['tax_id'] for value in current_page['results']
})
counter += 1
return mapping
def _retrieve_function_catalog(self, api, page_size=1000):
logger.info("Building local catalog from %s...", api.ROUTE)
counter = 1
next_page = None
functions = set()
while counter == 1 or next_page is not None:
params = {
'page': counter,
'page_size': page_size,
}
current_page = api.get_all(params=params)
next_page = current_page['next']
functions = functions.union(set(
[item['function_id'] for item in current_page['results']]
))
counter += 1
return functions
def build_function_mappings(self, page_size=1000):
self.metagenedb_keggs = self._retrieve_function_catalog(self.metagenedb_kegg_api, page_size=page_size)
self.metagenedb_eggnogs = self._retrieve_function_catalog(self.metagenedb_eggnog_api, page_size=page_size)
def build_mapping(self, page_size=1000):
self.phylum_mapping = self._build_taxo_mapping("phylum", page_size=page_size)
self.genus_mapping = self._build_taxo_mapping("genus", page_size=page_size)
def _get_number_genes(self):
if not os.path.isfile(self.annotation_file):
return 0
with open(self.annotation_file) as f:
for i, l in enumerate(f):
pass
return i + 1
def _select_taxonomy(self, gene_dict, unknown_val='unknown'):
"""
Select the taxonomy to be assigned for the gene.
genus has priority on phylum. If both unknow, remove the taxonomy key
"""
phylum = gene_dict.pop(self.PHYLUM_COL)
genus = gene_dict.pop(self.GENUS_COL)
if self.skip_tax:
return gene_dict
taxonomy_id = None
if genus != unknown_val:
taxonomy_id = self.genus_mapping.get(genus, None)
if taxonomy_id is None:
logger.warning("No tax_id found for genus %s" % genus)
elif phylum != unknown_val:
taxonomy_id = self.phylum_mapping.get(phylum, None)
if taxonomy_id is None:
logger.warning("No tax_id found for phylum %s" % genus)
if taxonomy_id is not None:
gene_dict.update({'taxonomy': taxonomy_id})
return gene_dict
def _parse_gene(self, raw_line, selected_keys=SELECTED_KEYS):
"""
Use IGCLineParser and return selected keys
"""
gene_parser = IGCLineParser()
all_dict = gene_parser.gene(raw_line)
selected_dict = {k: v for k, v in all_dict.items() if k in selected_keys}
return selected_dict
def _clean_functions(self, functions):
clean_functions = []
for function in functions:
if function['function_id'] in getattr(self, f"metagenedb_{function['source']}s"):
clean_functions.append(function['function_id'])
else:
logger.warning("Function %s not found from %s in metagenedb",
function['function_id'], function['source'])
return clean_functions
def _clean_gene(self, gene_dict):
gene_dict['gene_name'] = gene_dict['gene_id']
gene_dict['gene_id'] = slugify(gene_dict['gene_id'])
gene_dict['functions'] = [
{'source': 'kegg', 'function_id': v} for v in gene_dict.pop('kegg_ko') if v != 'unknown'] + \
[{'source': 'eggnog', 'function_id': v} for v in gene_dict.pop('eggnog') if v != 'unknown']
gene_dict = self._select_taxonomy(gene_dict)
if self.skip_functions or not gene_dict['functions']:
gene_dict.pop('functions')
else:
gene_dict['functions'] = self._clean_functions(gene_dict['functions'])
return gene_dict
def load_annotation_file_to_db_in_chunks(self, chunk_size=1000, test=False):
if not self.skip_tax:
self.build_mapping()
if not self.skip_functions:
self.build_function_mappings()
with open(self.annotation_file, 'r') as file:
while True:
chunk_genes = list(islice(file, chunk_size))
if not chunk_genes:
break
genes = [self._clean_gene(self._parse_gene(i)) for i in chunk_genes]
try:
response = self.metagenedb_gene_api.put(genes)
self.created_genes += response.get('created').get('count')
self.updated_genes += response.get('updated').get('count')
except HTTPError as http_error:
logging.warning("%s: %s; %s", http_error, http_error.response.json(), genes)
self.skipped_genes += len(genes)
self.processed_genes += len(chunk_genes)
logger.info("%s Genes processed so far...", self.processed_genes)
if test is True:
break
logger.info("[DONE] %s/%s Genes created.", self.created_genes, self.total_genes)
logger.info("[DONE] %s/%s Genes updated.", self.updated_genes, self.total_genes)
logger.info("[DONE] %s/%s Genes skipped.", self.skipped_genes, self.total_genes)
def parse_arguments():
"""
Defines parser.
"""
parser = argparse.ArgumentParser(description='Populate database from a given IGC annotation file.')
# Common arguments for analysis and annotations
parser.add_argument('annotation', help='IGC annotation file')
parser.add_argument('--url', help='base URL of the instance.', default='http://localhost/')
parser.add_argument('-t', '--jwt_token', help='your JWT token obtain from web app', required=True)
parser.add_argument('--chunk_size', type=int, default=1000,
help='How many genes to handle and create in the same time.')
parser.add_argument('--skip_taxonomy', action='store_true', help='Skip taxonomy information from genes.')
parser.add_argument('--skip_functions', action='store_true', help='Skip functions information from genes.')
parser.add_argument('--test', action='store_true', help='Run only on first chunk.')
parser.add_argument('-v', '--verbose', action='store_true')
try:
return parser.parse_args()
except SystemExit:
sys.exit(1)
def run():
args = parse_arguments()
if args.verbose:
logger.setLevel(logging.INFO)
import_igc_genes = ImportIGCGenes(args.annotation, args.url, args.jwt_token,
skip_tax=args.skip_taxonomy, skip_functions=args.skip_functions)
import_igc_genes.load_annotation_file_to_db_in_chunks(chunk_size=args.chunk_size, test=args.test)
if __name__ == "__main__":
run()