Commit 26fc8187 authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

Add script to build KEGG directly from manage.py

parent e2013000
Pipeline #19636 passed with stages
in 2 minutes and 20 seconds
import logging
import requests
from django.core.management.base import BaseCommand
from django.db import IntegrityError
from metagenedb.apps.catalog.models import KeggOrthology
from metagenedb.common.utils.chunks import generate_chunks
from metagenedb.common.utils.parsers import KEGGLineParser
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class ImportKEGGKO(object):
KEGG_KO_LIST_API = "http://rest.kegg.jp/list/ko"
ORM_SOURCE_KEY = 'source'
KEGG_SOURCE = 'kegg'
def __init__(self, kegg_ko_list_api=KEGG_KO_LIST_API):
self.kegg_ko_list_api = kegg_ko_list_api
self.processed_kegg_count = 0
self.created_kegg_count = 0
self.updated_kegg_count = 0
self.skipped_kegg_count = 0
self.skipped_kegg_ids = []
def load_all_kegg_ko(self, chunk_size=1000, test=False):
all_ko_response = requests.get(self.kegg_ko_list_api)
all_ko_response.raise_for_status()
all_ko = all_ko_response.text.splitlines()
self.total_kegg_nb = len(all_ko)
for chunk in generate_chunks(all_ko, chunk_size):
ko_chunk = [KEGGLineParser.ko_list(i) for i in chunk]
for i in ko_chunk:
payload = {k: v for k, v in i.items() if v != ""}
try:
kegg = KeggOrthology(**payload)
kegg.save()
self.created_kegg_count += 1
except IntegrityError:
try:
kegg = KeggOrthology.objects.get(function_id=payload.get('function_id'))
for k, v in payload.items():
setattr(kegg, k, v)
kegg.save()
self.updated_kegg_count += 1
except IntegrityError:
self.skipped_kegg_ids.append(payload.get('function_id'))
self.skipped_kegg_count += 1
self.processed_kegg_count += len(ko_chunk)
logger.info("%s/%s KEGG KO processed so far...", self.processed_kegg_count, self.total_kegg_nb)
if test:
break
logger.info("[DONE] %s/%s KEGG KO created.", self.created_kegg_count, self.total_kegg_nb)
logger.info("[DONE] %s/%s KEGG KO updated.", self.updated_kegg_count, self.total_kegg_nb)
logger.info("[DONE] %s/%s KEGG KO skipped. List: %s", self.skipped_kegg_count, self.total_kegg_nb,
self.skipped_kegg_ids)
class Command(BaseCommand):
help = 'Create or update all KEGG KO from KEGG API.'
def add_arguments(self, parser):
parser.add_argument('--test', action='store_true', help='Run only on first 1000 entries.')
def set_logger_level(self, verbosity):
if verbosity > 2:
logger.setLevel(logging.DEBUG)
elif verbosity > 1:
logger.setLevel(logging.INFO)
def handle(self, *args, **options):
self.set_logger_level(int(options['verbosity']))
import_kegg = ImportKEGGKO()
import_kegg.load_all_kegg_ko(test=options['test'])
# Generated by Django 2.2.7 on 2019-12-09 15:07
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('catalog', '0012_eggnog_eggnogfunctionalcategory'),
]
operations = [
migrations.AlterModelOptions(
name='eggnog',
options={'verbose_name_plural': 'EggNog'},
),
migrations.AlterModelOptions(
name='eggnogfunctionalcategory',
options={'verbose_name_plural': 'EggNog Functional categories'},
),
]
#!/usr/bin/env python
import argparse
import logging
import requests
import sys
import time
from bioapi import MetageneDBCatalogKeggOrthologyAPI
from metagenedb.common.utils.chunks import generate_chunks
from metagenedb.common.utils.parsers import KEGGLineParser
logging.basicConfig()
logger = logging.getLogger()
KEGG_KO_LIST_API = "http://rest.kegg.jp/list/ko"
class ImportKEGGKO(object):
METAGENEDB_FUNCTION_API = MetageneDBCatalogKeggOrthologyAPI
ORM_SOURCE_KEY = 'source'
KEGG_SOURCE = 'kegg'
def __init__(self, url, jwt_token, kegg_ko_list_api=KEGG_KO_LIST_API):
self.kegg_ko_list_api = kegg_ko_list_api
self.metagenedb_function_api = self.METAGENEDB_FUNCTION_API(base_url=url, jwt_token=jwt_token)
self.processed_kegg = 0
self.created_kegg = 0
self.updated_kegg = 0
# self.skipped_kegg = 0
def load_all_kegg_ko(self, chunk_size=1000):
all_ko_response = requests.get(self.kegg_ko_list_api)
all_ko_response.raise_for_status()
all_ko = all_ko_response.text.splitlines()
self.total_kegg_nb = len(all_ko)
for chunk in generate_chunks(all_ko, chunk_size):
ko_chunk = [KEGGLineParser.ko_list(i) for i in chunk]
for i in ko_chunk:
i.update({self.ORM_SOURCE_KEY: self.KEGG_SOURCE})
response = self.metagenedb_function_api.put(ko_chunk)
self.created_kegg += response.get('created').get('count')
self.updated_kegg += response.get('updated').get('count')
self.processed_kegg += len(ko_chunk)
logger.info("%s/%s KEGG KO processed so far...", self.processed_kegg, self.total_kegg_nb)
time.sleep(1)
logger.info("[DONE] %s/%s KEGG KO created.", self.created_kegg, self.total_kegg_nb)
logger.info("[DONE] %s/%s KEGG KO updated.", self.updated_kegg, self.total_kegg_nb)
# logger.info("[DONE] %s/%s KEGG KO skipped.", self.skipped_kegg, self.total_kegg_nb)
def parse_arguments():
"""
Defines parser.
"""
parser = argparse.ArgumentParser(description=f'Populate KEGG KO database from {KEGG_KO_LIST_API}.')
parser.add_argument('--url', help='base URL of the instance.', default='http://localhost/')
parser.add_argument('-t', '--jwt_token', help='your JWT token obtain from web app', required=True)
parser.add_argument('-v', '--verbose', action='store_true')
try:
return parser.parse_args()
except SystemExit:
sys.exit(1)
def run():
args = parse_arguments()
if args.verbose:
logger.setLevel(logging.INFO)
import_kegg_ko = ImportKEGGKO(args.url, args.jwt_token)
import_kegg_ko.load_all_kegg_ko()
if __name__ == "__main__":
run()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment