Commit d4cb92b0 authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

add Kegg function to Gene and script to load them from API

parent 4557118c
from django.contrib import admin
from .models import Gene, Function
from .models import Gene, Function, KeggOrthology
@admin.register(Gene)
class GeneAdmin(admin.ModelAdmin):
list_display = ('gene_id', 'gene_length')
list_display = ('gene_id', 'gene_length', 'get_functions')
search_fields = ('gene_id',)
def get_functions(self, obj):
return ",".join([str(f) for f in obj.functions.all()])
get_functions.short_description = 'Functions'
@admin.register(KeggOrthology)
class KeggOrthologyAdmin(admin.ModelAdmin):
list_display = ('function_id', 'name', 'long_name', 'ec_number', 'source')
search_fields = ('function_id',)
@admin.register(Function)
class FunctionAdmin(admin.ModelAdmin):
list_display = (('function_id', 'source'))
list_display = ('function_id', 'name', 'source')
search_fields = ('function_id',)
# Generated by Django 2.2.1 on 2019-06-17 14:38
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Function',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('function_id', models.CharField(db_index=True, max_length=100)),
('name', models.CharField(max_length=100)),
('source', models.CharField(choices=[('undef', 'Undefined'), ('kegg', 'KEGG'), ('eggnog', 'EggNOG')], default='undef', max_length=10)),
],
),
migrations.CreateModel(
name='KeggOrthology',
fields=[
('function_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='catalog.Function')),
('ec_number', models.CharField(blank=True, default='', max_length=200)),
('long_name', models.CharField(max_length=500)),
],
options={
'verbose_name_plural': 'Kegg orthologies',
},
bases=('catalog.function',),
),
migrations.CreateModel(
name='Gene',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('gene_id', models.CharField(db_index=True, max_length=100, unique=True)),
('gene_length', models.IntegerField()),
('functions', models.ManyToManyField(to='catalog.Function')),
],
),
]
......@@ -11,7 +11,7 @@ class Function(models.Model):
(EGGNOG, 'EggNOG')
]
function_id = models.CharField(max_length=100, unique=True, db_index=True)
function_id = models.CharField(max_length=100, db_index=True)
name = models.CharField(max_length=100)
source = models.CharField(max_length=10, choices=SOURCE_CHOICES, default=UNDEFINED)
......
......@@ -6,7 +6,8 @@ from .function import Function
class Gene(models.Model):
gene_id = models.CharField(max_length=100, unique=True, db_index=True)
gene_length = models.IntegerField()
functions = models.ManyToManyField(Function)
functions = models.ManyToManyField(Function, null=True)
def __str__(self):
return self.gene_id
from rest_framework import serializers
from .models import Gene
from .models import Gene, Function
class FunctionSerializer(serializers.ModelSerializer):
class Meta:
model = Function
fields = ('function_id', 'source', 'name')
class GeneSerializer(serializers.ModelSerializer):
functions = FunctionSerializer(many=True, read_only=True)
class Meta:
model = Gene
fields = ('gene_id', 'gene_length')
fields = ('gene_id', 'gene_length', 'functions')
#!/usr/bin/env python
import argparse
import logging
import os
......@@ -11,13 +12,13 @@ from django.core.exceptions import ValidationError
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "metagenedb.settings")
django.setup()
from metagenedb.apps.catalog.models import Gene
from metagenedb.apps.catalog.models import Gene, Function
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
def create_gene(raw_line):
def parse_gene(raw_line):
"""
IGC annotation columns:
0: Gene ID Unique ID
......@@ -37,36 +38,61 @@ def create_gene(raw_line):
representative gene or a redundant gene belonging to it
"""
gene_info = raw_line.rstrip().split('\t')
gene = Gene(gene_id=gene_info[1],
gene_length=gene_info[2])
return gene
return {
'gene_id': gene_info[1],
'gene_length': gene_info[2],
'kegg_ko': gene_info[7]
}
def insert_gene(gene):
gene.full_clean()
gene.save()
def link_to_function(obj_gene, gene_dict):
try:
function = Function.objects.get(function_id=gene_dict.get('kegg_ko'))
obj_gene.functions.add(function)
obj_gene.full_clean()
obj_gene.save()
except Function.DoesNotExist:
_LOGGER.warning(f"{gene_dict.get('kegg_ko')} not found in the database {gene_dict}.")
def insert_gene(gene_dict):
MANY_TO_MANY_FIELDS = ['kegg_ko']
try:
obj_gene = Gene.objects.get(gene_id=gene_dict.get('gene_id'))
for key, value in gene_dict.items():
if key not in MANY_TO_MANY_FIELDS:
setattr(obj_gene, key, value)
except Gene.DoesNotExist:
obj_gene = Gene(gene_id=gene_dict.get('gene_id'),
gene_length=gene_dict.get('gene_length'))
obj_gene.full_clean()
obj_gene.save()
# Add link to KEGG
if gene_dict.get('kegg_ko') != 'unknown':
link_to_function(obj_gene, gene_dict)
def insert_gene_list(chunk_genes):
for i in chunk_genes:
try:
gene = create_gene(i)
insert_gene(gene)
gene_dict = parse_gene(i)
insert_gene(gene_dict)
except ValidationError as e:
_LOGGER.warning(f"{e.__dict__} for gene_id: {gene.gene_id}. Insertion skipped.")
_LOGGER.warning(f"{e.__dict__} for gene_id: {gene_dict.get('gene_id')}. Insertion skipped.")
def load_annotation_file_to_db_in_chunks(annotation_file, chunk_size=100000):
loaded_genes = 0
processed_genes = 0
with open(annotation_file, 'r') as file:
while True:
chunk_genes = list(islice(file, chunk_size))
if not chunk_genes:
break
loaded_genes += len(chunk_genes)
processed_genes += len(chunk_genes)
insert_gene_list(chunk_genes)
_LOGGER.info(f"{loaded_genes} genes processed so far...")
_LOGGER.info(f"[DONE] {loaded_genes} genes processed.")
_LOGGER.info(f"{processed_genes} genes processed so far...")
_LOGGER.info(f"[DONE] {processed_genes} genes processed.")
def parse_arguments():
......
#!/usr/bin/env python
import argparse
import logging
import os
import requests
import sys
import django
from django.core.exceptions import ValidationError
# Before model import, we need to called django.setup() to Load apps
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "metagenedb.settings")
django.setup()
from metagenedb.apps.catalog.models import KeggOrthology
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
KEGG_KO_LIST_API = "http://rest.kegg.jp/list/ko"
def parse_arguments():
"""
Defines parser.
"""
parser = argparse.ArgumentParser(description=f'Populate KEGG KO database from {KEGG_KO_LIST_API}.')
try:
return parser.parse_args()
except SystemExit:
sys.exit(1)
def parse_ko(line):
"""
Parse line from kegg KO list to return organized dict
"""
content = line.split('\t')
function_id = content[0].split(':')[1]
names = content[1].split(';')
if '[EC:' in names[1]:
ec_number = names[1].split('[EC:')[1].rstrip(']')
else:
ec_number = ''
return {
'function_id': function_id,
'name': names[0],
'long_name': names[1].lstrip(),
'ec_number': ec_number
}
def create_kegg_ko(kegg_ko):
try:
obj_kegg = KeggOrthology.objects.get(function_id=kegg_ko.get('function_id'))
for key, value in kegg_ko.items():
setattr(obj_kegg, key, value)
except KeggOrthology.DoesNotExist:
obj_kegg = KeggOrthology(**kegg_ko)
obj_kegg.full_clean()
obj_kegg.save()
def run():
args = parse_arguments()
all_ko = requests.get("http://rest.kegg.jp/list/ko")
all_ko.raise_for_status()
inserted_kegg = 0
skipped_kegg = 0
total_kegg = len(all_ko.text.splitlines())
for line in all_ko.text.splitlines():
kegg_ko = parse_ko(line)
try:
create_kegg_ko(kegg_ko)
inserted_kegg += 1
except ValidationError as e:
skipped_kegg += 1
_LOGGER.warning(f"{e.__dict__} for function_id: {kegg_ko.get('function_id')}. Insertion skipped.")
if inserted_kegg > 0 and inserted_kegg % 100 == 0:
_LOGGER.info(f"{inserted_kegg}/{total_kegg} KEGG KO inserted so far...")
_LOGGER.info(f"[DONE] {inserted_kegg}/{total_kegg} KEGG KO inserted.")
_LOGGER.info(f"[DONE] {skipped_kegg}/{total_kegg} KEGG KO skipped.")
# Create unknown entry
if __name__ == "__main__":
run()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment