Select Git revision
import_igc_data.py 5.99 KiB
#!/usr/bin/env python
import argparse
import logging
import os
import sys
from itertools import islice
from bioapi import MetageneDBCatalogGeneAPI, MetageneDBCatalogTaxonomyAPI
from requests.exceptions import HTTPError
from slugify import slugify
from metagenedb.common.utils.parsers import IGCLineParser
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger()
class ImportIGCGenes(object):
METAGENEDB_GENE_API = MetageneDBCatalogGeneAPI
METAGENEDB_TAXONOMY_API = MetageneDBCatalogTaxonomyAPI
PHYLUM_COL = 'taxo_phylum'
GENUS_COL = 'taxo_genus'
SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', PHYLUM_COL, GENUS_COL]
def __init__(self, annotation_file, url, skip_tax=False, skip_functions=False):
self.annotation_file = annotation_file
self.url = url
self.metagenedb_gene_api = self.METAGENEDB_GENE_API(base_url=self.url)
self.metagenedb_taxonomy_api = self.METAGENEDB_TAXONOMY_API(base_url=self.url)
self.total_genes = self._get_number_genes()
self._reset_counters()
# Skip some insertion if specified in script options
self.skip_tax = skip_tax
self.skip_functions = skip_functions
def _reset_counters(self):
self.processed_genes = 0
self.created_genes = 0
self.updated_genes = 0
self.skipped_genes = 0
def _get_number_genes(self):
if not os.path.isfile(self.annotation_file):
return 0
with open(self.annotation_file) as f:
for i, l in enumerate(f):
pass
return i + 1
def _select_taxonomy(self, gene_dict, unknown_val='unknown'):
"""
Select the taxonomy to be assigned for the gene.
genus has priority on phylum. If both unknow, remove the taxonomy key
"""
phylum = gene_dict.pop(self.PHYLUM_COL)
genus = gene_dict.pop(self.GENUS_COL)
if self.skip_tax:
return gene_dict
resp_dict = {}
if genus != unknown_val:
resp_dict = self.metagenedb_taxonomy_api.get_all(params={'name': genus, 'rank': 'genus'})
if len(resp_dict['results']) > 1:
logger.warning(f"More than 1 result found for genus {genus}. First result is kept.")
elif phylum != unknown_val:
resp_dict = self.metagenedb_taxonomy_api.get_all(params={'name': phylum, 'rank': 'phylum'})
if len(resp_dict['results']) > 1:
logger.warning(f"More than 1 result found for phylum {phylum}. First result is kept.")
if resp_dict.get('count', 0) > 0:
gene_dict.update({'taxonomy': resp_dict['results'][0]['tax_id']})
return gene_dict
def _parse_gene(self, raw_line, selected_keys=SELECTED_KEYS):
"""
Use IGCLineParser and return selected keys
"""
gene_parser = IGCLineParser()
all_dict = gene_parser.gene(raw_line)
selected_dict = {k: v for k, v in all_dict.items() if k in selected_keys}
return selected_dict
def _clean_gene(self, gene_dict):
gene_dict['gene_name'] = gene_dict['gene_id']
gene_dict['gene_id'] = slugify(gene_dict['gene_id'])
gene_dict['functions'] = gene_dict.pop('kegg_ko')
gene_dict = self._select_taxonomy(gene_dict)
if self.skip_functions or 'unknown' in gene_dict['functions']:
gene_dict.pop('functions')
return gene_dict
def load_annotation_file_to_db_in_chunks(self, chunk_size=1000, test=False):
with open(self.annotation_file, 'r') as file:
while True:
chunk_genes = list(islice(file, chunk_size))
if not chunk_genes:
break
genes = [self._clean_gene(self._parse_gene(i)) for i in chunk_genes]
try:
response = self.metagenedb_gene_api.put(genes)
self.created_genes += response.get('created').get('count')
self.updated_genes += response.get('updated').get('count')
except HTTPError as http_error:
logging.warning("%s: %s; %s", http_error, http_error.response.json(), genes)
self.skipped_genes += len(genes)
self.processed_genes += len(chunk_genes)
logger.info("%s Genes processed so far...", self.processed_genes)
if test is True:
break
logger.info("[DONE] %s/%s Genes created.", self.created_genes, self.total_genes)
logger.info("[DONE] %s/%s Genes updated.", self.updated_genes, self.total_genes)
logger.info("[DONE] %s/%s Genes skipped.", self.skipped_genes, self.total_genes)
def parse_arguments():
"""
Defines parser.
"""
parser = argparse.ArgumentParser(description='Populate database from a given IGC annotation file.')
# Common arguments for analysis and annotations
parser.add_argument('annotation', help='IGC annotation file')
parser.add_argument('--url', help='base URL of the instance.', default='http://localhost/')
parser.add_argument('--chunk_size', type=int, default=1000, help='How many genes to handle and create in the same time.')
parser.add_argument('--skip_taxonomy', action='store_true', help='Skip taxonomy information from genes.')
parser.add_argument('--skip_functions', action='store_true', help='Skip functions information from genes.')
parser.add_argument('--test', action='store_true', help='Run only on first chunk.')
parser.add_argument('-v', '--verbose', action='store_true')
try:
return parser.parse_args()
except SystemExit:
sys.exit(1)
def run():
args = parse_arguments()
if args.verbose:
logger.setLevel(logging.INFO)
import_igc_genes = ImportIGCGenes(args.annotation, args.url,
skip_tax=args.skip_taxonomy, skip_functions=args.skip_functions)
import_igc_genes.load_annotation_file_to_db_in_chunks(chunk_size=args.chunk_size, test=args.test)
if __name__ == "__main__":
run()