Commit 365da9d0 authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

Merge branch '23-taxonomy-model' into 'master'

Integrate taxonomy to database

Closes #23

See merge request !3
parents cbad11aa e679c49d
Pipeline #13243 passed with stage
in 1 minute and 47 seconds
from .gene import GeneAdmin
from .function import FunctionAdmin, KeggOrthologyAdmin
from .taxonomy import TaxonomyAdmin
__all__ = ['GeneAdmin', 'FunctionAdmin', 'KeggOrthologyAdmin']
__all__ = ['GeneAdmin', 'FunctionAdmin', 'KeggOrthologyAdmin', 'TaxonomyAdmin']
from django.contrib import admin
from metagenedb.apps.catalog.models import Taxonomy
@admin.register(Taxonomy)
class TaxonomyAdmin(admin.ModelAdmin):
list_display = (
'tax_id', 'name', 'rank',
'kingdom', 'phylum', 'class_rank', 'order', 'family', 'genus', 'species',
)
search_fields = ('tax_id', 'name')
# Generated by Django 2.2.1 on 2019-07-17 12:20
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('catalog', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='Taxonomy',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('tax_id', models.CharField(db_index=True, max_length=20, unique=True)),
('name', models.CharField(default='No scientific name', max_length=200)),
('rank', models.CharField(choices=[('infraclass', 'Infraclass'), ('class', 'Class'), ('forma', 'Forma'), ('phylum', 'Phylum'), ('species_subgroup', 'Species subgroup'), ('genus', 'Genus'), ('parvorder', 'Parvorder'), ('subcohort', 'Subcohort'), ('subtribe', 'Subtribe'), ('superphylum', 'Superphylum'), ('subgenus', 'Subgenus'), ('superorder', 'Superorder'), ('species', 'Species'), ('subphylum', 'Subphylum'), ('infraorder', 'Infraorder'), ('section', 'Section'), ('tribe', 'Tribe'), ('cohort', 'Cohort'), ('subsection', 'Subsection'), ('series', 'Series'), ('order', 'Order'), ('subclass', 'Subclass'), ('superfamily', 'Superfamily'), ('superclass', 'Superclass'), ('superkingdom', 'Superkingdom'), ('kingdom', 'Kingdom'), ('family', 'Family'), ('suborder', 'Suborder'), ('subkingdom', 'Subkingdom'), ('subspecies', 'Subspecies'), ('no_rank', 'No rank'), ('subfamily', 'Subfamily'), ('varietas', 'Varietas'), ('species_group', 'Species group')], max_length=20)),
('parent', models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='children', to='catalog.Taxonomy')),
],
options={
'verbose_name_plural': 'Taxonomy',
},
),
]
# Generated by Django 2.2.1 on 2019-07-17 13:51
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
('catalog', '0002_taxonomy'),
]
operations = [
migrations.AddField(
model_name='taxonomy',
name='class_rank',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='class_children', to='catalog.Taxonomy', verbose_name='class'),
),
migrations.AddField(
model_name='taxonomy',
name='family',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='family_children', to='catalog.Taxonomy'),
),
migrations.AddField(
model_name='taxonomy',
name='genus',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='genus_children', to='catalog.Taxonomy'),
),
migrations.AddField(
model_name='taxonomy',
name='kingdom',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='kingdom_children', to='catalog.Taxonomy'),
),
migrations.AddField(
model_name='taxonomy',
name='order',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='order_children', to='catalog.Taxonomy'),
),
migrations.AddField(
model_name='taxonomy',
name='phylum',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='phylum_children', to='catalog.Taxonomy'),
),
migrations.AddField(
model_name='taxonomy',
name='species',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='species_children', to='catalog.Taxonomy'),
),
migrations.AlterField(
model_name='taxonomy',
name='parent',
field=models.ForeignKey(blank=True, null=True, on_delete=django.db.models.deletion.SET_NULL, related_name='direct_children', to='catalog.Taxonomy'),
),
]
from .function import Function, KeggOrthology
from .gene import Gene
from .taxonomy import Taxonomy
__all__ = ['Function', 'KeggOrthology', 'Gene']
__all__ = ['Function', 'KeggOrthology', 'Gene', 'Taxonomy']
from django.db import models
class Taxonomy(models.Model):
"""
Taxonomy is based on NCBI taxonomy: https://www.ncbi.nlm.nih.gov/taxonomy
"""
NAME_DEFAULT = "No scientific name"
RANK_CHOICES = [
('infraclass', 'Infraclass'),
('class', 'Class'),
('forma', 'Forma'),
('phylum', 'Phylum'),
('species_subgroup', 'Species subgroup'),
('genus', 'Genus'),
('parvorder', 'Parvorder'),
('subcohort', 'Subcohort'),
('subtribe', 'Subtribe'),
('superphylum', 'Superphylum'),
('subgenus', 'Subgenus'),
('superorder', 'Superorder'),
('species', 'Species'),
('subphylum', 'Subphylum'),
('infraorder', 'Infraorder'),
('section', 'Section'),
('tribe', 'Tribe'),
('cohort', 'Cohort'),
('subsection', 'Subsection'),
('series', 'Series'),
('order', 'Order'),
('subclass', 'Subclass'),
('superfamily', 'Superfamily'),
('superclass', 'Superclass'),
('superkingdom', 'Superkingdom'),
('kingdom', 'Kingdom'),
('family', 'Family'),
('suborder', 'Suborder'),
('subkingdom', 'Subkingdom'),
('subspecies', 'Subspecies'),
('no_rank', 'No rank'),
('subfamily', 'Subfamily'),
('varietas', 'Varietas'),
('species_group', 'Species group'),
]
tax_id = models.CharField(max_length=20, unique=True, db_index=True)
name = models.CharField(max_length=200, default=NAME_DEFAULT)
rank = models.CharField(max_length=20, choices=RANK_CHOICES)
parent = models.ForeignKey(
'Taxonomy', related_name='direct_children',
on_delete=models.SET_NULL,
null=True, blank=True,
)
kingdom = models.ForeignKey(
'Taxonomy', related_name='kingdom_children',
on_delete=models.SET_NULL,
null=True, blank=True,
)
phylum = models.ForeignKey(
'Taxonomy', related_name='phylum_children',
on_delete=models.SET_NULL,
null=True, blank=True,
)
class_rank = models.ForeignKey(
'Taxonomy', related_name='class_children',
on_delete=models.SET_NULL,
null=True, blank=True,
verbose_name="class"
)
order = models.ForeignKey(
'Taxonomy', related_name='order_children',
on_delete=models.SET_NULL,
null=True, blank=True,
)
family = models.ForeignKey(
'Taxonomy', related_name='familyphy_children',
on_delete=models.SET_NULL,
null=True, blank=True,
)
genus = models.ForeignKey(
'Taxonomy', related_name='genus_children',
on_delete=models.SET_NULL,
null=True, blank=True,
)
species = models.ForeignKey(
'Taxonomy', related_name='species_children',
on_delete=models.SET_NULL,
null=True, blank=True,
)
def __str__(self):
return f"{self.name}"
def build_parental_hierarchy(self):
hierarchy = {}
if self.name != 'root' and self.parent is not None:
hierarchy[self.rank] = self.tax_id
hierarchy = {**hierarchy, **self.parent.build_parental_hierarchy()}
hierarchy['tax_id'] = self.tax_id
return hierarchy
class Meta:
verbose_name_plural = "Taxonomy"
from unittest import TestCase
from .taxonomy import Taxonomy
class TestBuildHierarchy(TestCase):
@classmethod
def setUpClass(cls):
"""
Build some test data for different tests
"""
cls.root = Taxonomy(
tax_id="1",
name="root",
rank="no_rank",
)
cls.kingdom = Taxonomy(
tax_id="2",
name="KINGDOM",
rank="kingdom",
parent=cls.root
)
cls.phylum = Taxonomy(
tax_id="3",
name="PHYLUM",
rank="phylum",
parent=cls.kingdom
)
def test_build_hierarchy(self):
expected_dict = {
'tax_id': '3',
'phylum': '3',
'kingdom': '2'
}
test_dict = self.phylum.build_parental_hierarchy()
self.assertDictEqual(test_dict, expected_dict)
from .function import FunctionSerializer
from .gene import GeneSerializer
from .taxonomy import TaxonomySerializer
__all__ = ['FunctionSerializer', 'GeneSerializer', 'TaxonomySerializer']
from rest_framework import serializers
from metagenedb.apps.catalog.models import Function
class FunctionSerializer(serializers.ModelSerializer):
class Meta:
model = Function
fields = ('function_id', 'source', 'name')
from rest_framework import serializers
from metagenedb.apps.catalog.models import Gene
from metagenedb.apps.catalog.serializers import FunctionSerializer
class GeneSerializer(serializers.ModelSerializer):
functions = FunctionSerializer(many=True, read_only=True)
class Meta:
model = Gene
fields = ('gene_id', 'gene_length', 'functions')
from rest_framework import serializers
from metagenedb.apps.catalog.models import Taxonomy
class TaxonomySerializer(serializers.ModelSerializer):
rank = serializers.CharField(required=False)
parent_tax_id = serializers.SlugRelatedField(
queryset=Taxonomy.objects.all(),
slug_field='tax_id',
source='parent',
required=False,
)
kingdom = serializers.SlugRelatedField(
queryset=Taxonomy.objects.all(),
slug_field='tax_id',
required=False
)
phylum = serializers.SlugRelatedField(
queryset=Taxonomy.objects.all(),
slug_field='tax_id',
required=False
)
class_rank = serializers.SlugRelatedField(
queryset=Taxonomy.objects.all(),
slug_field='tax_id',
required=False
)
order = serializers.SlugRelatedField(
queryset=Taxonomy.objects.all(),
slug_field='tax_id',
required=False
)
family = serializers.SlugRelatedField(
queryset=Taxonomy.objects.all(),
slug_field='tax_id',
required=False
)
genus = serializers.SlugRelatedField(
queryset=Taxonomy.objects.all(),
slug_field='tax_id',
required=False
)
species = serializers.SlugRelatedField(
queryset=Taxonomy.objects.all(),
slug_field='tax_id',
required=False
)
class Meta:
model = Taxonomy
fields = (
'tax_id', 'name', 'rank', 'parent_tax_id',
'kingdom', 'phylum', 'class_rank', 'order', 'family', 'genus', 'species',
)
import logging
from rest_framework import status
from rest_framework.decorators import (
api_view,
......@@ -7,8 +9,28 @@ from rest_framework.decorators import (
from rest_framework.response import Response
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
from metagenedb.apps.catalog.models import Gene
from metagenedb.apps.catalog.models import Function, Gene
from metagenedb.apps.catalog.serializers import GeneSerializer
from metagenedb.apps.catalog.views.insertion_model import InsertionBase
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
class GeneInsertion(InsertionBase):
MANY_TO_MANY_FIELDS = ['kegg_ko']
model = Gene
obj_id = "gene_id"
def _link_kegg_ko(self, function_id):
VALUE_TO_SKIP = ['unknown']
if function_id not in VALUE_TO_SKIP:
try:
function = Function.objects.get(function_id=function_id)
self.obj.functions.add(function)
self.full_clean_and_save()
except Function.DoesNotExist:
_LOGGER.warning(f"{function_id} not found in the database. Full dict: {self.full_dict}.")
@api_view(['GET'])
......
from abc import ABC
from metagenedb.utils.dict_operations import extract_dict
class InsertionBase(ABC):
"""
Base for insertion in DB for different models.
This base will be used for POST methods but also direct insertion to DB from scripts.
"""
MANY_TO_MANY_FIELDS = []
FOREIGN_KEY_FIELDS = []
SIMPLE_FIELDS = [] # Fields you want to be able to create with the class
@property
def model(self):
raise NotImplementedError
@property
def obj_id(self):
raise NotImplementedError
def __init__(self, model_dict):
self.full_dict = model_dict.copy()
self.foreign_key_dict = extract_dict(model_dict, self.FOREIGN_KEY_FIELDS)
self.many_to_many_dict = extract_dict(model_dict, self.MANY_TO_MANY_FIELDS)
if self.SIMPLE_FIELDS:
self.simple_dict = extract_dict(model_dict, self.SIMPLE_FIELDS)
else:
self.simple_dict = model_dict.copy()
self.obj = None
def upsert_to_db(self):
try:
self.obj = self.model.objects.get(**{self.obj_id: self.full_dict.get(self.obj_id)})
for key, value in self.simple_dict.items():
setattr(self.obj, key, value)
except self.model.DoesNotExist:
self.create_obj()
self.full_clean_and_save()
self.handle_foreign_fields()
self.handle_many_to_many_fields()
def create_obj(self):
self.obj = self.model(**self.simple_dict)
def full_clean_and_save(self):
self.obj.full_clean()
self.obj.save()
def handle_foreign_fields(self):
for key, value in self.foreign_key_dict.items():
getattr(self, f"_link_{key}")(value)
def handle_many_to_many_fields(self):
for key, value in self.many_to_many_dict.items():
getattr(self, f"_link_{key}")(value)
import logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
def extract_dict(source_dict, keys, keep_original_source=False):
"""
Extract a dict from a given dict based on a given set of keys
"""
extracted_dict = {}
for key in keys:
try:
extracted_dict[key] = source_dict[key]
if not keep_original_source:
del source_dict[key]
except KeyError:
_LOGGER.warning(f"[{key}] is not found in the source dict, extraction skipped for this key.")
return extracted_dict
import logging
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
class KEGGLineParser(object):
@staticmethod
def ko_list(line):
"""
Parse line from kegg KO list (http://rest.kegg.jp/list/ko) to return organized dict
"""
try:
elements = line.split('\t')
function_id = elements[0].split(':')[1]
if ';' in elements[1]:
names = elements[1].split(';')
else:
_LOGGER.warning(f"Parsing issue with {function_id}, corresponding line: {line}")
names = [elements[1], ''] # Ugly fix to handle one specific case with no name: K23479
if '[EC:' in names[1]:
ec_number = names[1].split('[EC:')[1].rstrip(']')
else:
ec_number = ''
return {
'function_id': function_id,
'name': names[0],
'long_name': names[1].lstrip(),
'ec_number': ec_number
}
except Exception:
_LOGGER.error(f"Could not parse: {line.rstrip()}. Are you sure it comes from KEGG KO list?")
raise
class NCBITaxonomyLineParser(object):
@staticmethod
def node(line):
"""
parse line from ncbi nodes.dmp file
From documentation:
nodes.dmp file consists of taxonomy nodes.
The description for each node includes the following fields:
tax_id -- node id in GenBank taxonomy database
parent tax_id -- parent node id in GenBank taxonomy database
rank -- rank of this node (superkingdom, kingdom, ...)
embl code -- locus-name prefix; not unique
division id -- see division.dmp file
inherited div flag (1 or 0) -- 1 if node inherits division from parent
genetic code id -- see gencode.dmp file
inherited GC flag (1 or 0) -- 1 if node inherits genetic code from parent
mitochondrial genetic code id -- see gencode.dmp file
inherited MGC flag (1 or 0) -- 1 if node inherits mitochondrial gencode from parent
GenBank hidden flag (1 or 0) -- 1 if name is suppressed in GenBank entry lineage
hidden subtree root flag (1 or 0) -- 1 if this subtree has no sequence data yet
comments -- free-text comments and citations
"""
elements = line.rstrip().split('|')
elements = [element.strip() for element in elements]
try:
return {
"tax_id": elements[0],
"parent_tax_id": elements[1],
"rank": elements[2].replace(' ', '_'),
"embl_code": elements[3],
"division_id": elements[4],
"inherited_div_flag": elements[5],
"genetic_code_id": elements[6],
"inherited_GC_flag": elements[7],
"mitochondrial_genetic_code_id": elements[8],
"inherited_MGC_flag": elements[9],
"GenBank_hidden_flag": elements[10],
"hidden_subtree_root_flag": elements[11],
"comments": elements[12]
}
except Exception:
_LOGGER.error(f"Could not parse: {line.rstrip()}. Are you sure it comes from nodes.dmp file?")
raise
@staticmethod
def name(line):
"""
parse line from ncbi names.dmp file
From documentation:
Taxonomy names file (names.dmp):
tax_id -- the id of node associated with this name
name_txt -- name itself
unique name -- the unique variant of this name if name not unique
name class -- (synonym, common name, ...)
"""
elements = line.rstrip().split('|')
try:
return {
"tax_id": elements[0].strip(),
"name_txt": elements[1].strip(),
"unique_name": elements[2].strip(),
"name_class": elements[3].strip(),
}
except Exception:
_LOGGER.error(f"Could not parse: {line.rstrip()}. Are you sure it comes from nodes.dmp file?")
raise
from unittest import TestCase
from metagenedb.utils.dict_operations import extract_dict
class TestExtractDict(TestCase):
def test_extract_dict(self):
source_dict = {'a': 1, 'b': 2}
extract_keys = ['b']