Commit 84a1b981 authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

Merge branch '155-taxonomy-details-from-genes' into 'dev'

Retrieve taxonomy details from gene list

Closes #155

See merge request !68
parents 8c35c9e0 01a8a5a8
Pipeline #37912 passed with stages
in 3 minutes and 31 seconds
......@@ -7,7 +7,7 @@ stages:
# Only needed when using a docker container to run your tests in.
# Check out: http://docs.gitlab.com/ce/ci/docker/using_docker_images.html#what-is-a-service
services:
- docker:dind
- docker:18-dind
variables:
POSTGRES_DB: postgres
......
......@@ -52,36 +52,6 @@ class GeneViewSet(BulkViewSet):
response['Content-Disposition'] = 'attachment; filename=%s' % filename
return response
def _extract_taxonomy_info(self, gene):
if gene.taxonomy is None:
return ['', '', '']
return [gene.taxonomy.tax_id, gene.taxonomy.name, gene.taxonomy.rank]
def _extract_function_info(self, gene):
if not gene.functions.all():
return ['', '']
function_ids = {
'kegg': [],
'eggnog': []
}
for function in gene.functions.all():
function_ids.get(function.source).append(function.function_id)
return [
';'.join(function_ids['kegg']),
';'.join(function_ids['eggnog'])
]
def _get_metadata_line(self, gene):
"""
Transform gene content to a line for metadata extract
"""
gene_items = [
gene.gene_id, gene.name, gene.source, gene.length,
]
gene_items = gene_items + self._extract_taxonomy_info(gene)
gene_items = gene_items + self._extract_function_info(gene)
return ','.join([str(item) for item in gene_items])
def _build_csv_response(self):
queryset = self.filter_queryset(self.get_queryset())
queryset = queryset.select_related("taxonomy").prefetch_related("functions")
......@@ -89,13 +59,10 @@ class GeneViewSet(BulkViewSet):
return self.too_many_genes_error_response
with StringIO() as csv_file:
# Write header
header = ",".join([
'gene_id', 'gene_name', 'gene_source', 'length', 'tax_id', 'tax_name', 'tax_rank',
'kegg_id', 'eggnog_id',
])
header = Gene.CSV_HEADER
csv_file.write(f"{header}\n")
for gene in queryset:
csv_file.write(f"{self._get_metadata_line(gene)}\n")
for gene in queryset.iterator():
csv_file.write(f"{gene.csv}\n")
# generate the file
response = HttpResponse(csv_file.getvalue(), content_type='text/csv')
filename = 'metagenedb.csv'
......
......@@ -3,9 +3,6 @@ from django.urls import reverse
from rest_framework import status
from metagenedb.api.catalog.views import GeneViewSet
from metagenedb.apps.catalog.factory import (
GeneFactory, GeneWithEggNOGFactory, GeneWithKeggFactory, GeneWithTaxonomyFactory
)
class GeneViewSetMock(GeneViewSet):
......@@ -26,54 +23,3 @@ class TestGenes(TestCase):
url = reverse('api:catalog:v1:genes-list')
resp = self.client.get(url)
self.assertEqual(resp.status_code, status.HTTP_200_OK)
def test_get_metadata_line_no_functions(self):
gene = GeneFactory()
expected_items = [
gene.gene_id, gene.name, gene.source, gene.length,
'', '', '', '', ''
]
expected_line = ','.join([str(item) for item in expected_items])
# Make test with method from GeneViewSet
viewset = GeneViewSetMock()
tested_line = viewset._get_metadata_line(gene)
self.assertEqual(tested_line, expected_line)
def test_get_metadata_line_with_taxonomy(self):
gene = GeneWithTaxonomyFactory()
expected_items = [
gene.gene_id, gene.name, gene.source, gene.length,
gene.taxonomy.tax_id, gene.taxonomy.name, gene.taxonomy.rank,
'', ''
]
expected_line = ','.join([str(item) for item in expected_items])
# Make test with method from GeneViewSet
viewset = GeneViewSetMock()
tested_line = viewset._get_metadata_line(gene)
self.assertEqual(tested_line, expected_line)
def test_get_metadata_line_with_kegg(self):
gene = GeneWithKeggFactory()
expected_items = [
gene.gene_id, gene.name, gene.source, gene.length,
'', '', '',
gene.functions.all()[0].function_id, ''
]
expected_line = ','.join([str(item) for item in expected_items])
# Make test with method from GeneViewSet
viewset = GeneViewSetMock()
tested_line = viewset._get_metadata_line(gene)
self.assertEqual(tested_line, expected_line)
def test_get_metadata_line_with_eggnog(self):
gene = GeneWithEggNOGFactory()
expected_items = [
gene.gene_id, gene.name, gene.source, gene.length,
'', '', '',
'', gene.functions.all()[0].function_id,
]
expected_line = ','.join([str(item) for item in expected_items])
# Make test with method from GeneViewSet
viewset = GeneViewSetMock()
tested_line = viewset._get_metadata_line(gene)
self.assertEqual(tested_line, expected_line)
......@@ -40,6 +40,8 @@ class DbGenerator:
rank=rank,
parent=getattr(self, "last_tax", None)
)
else:
self.last_tax = models.Taxonomy.objects.get(tax_id=desc['tax_id'])
self.created_ids.add(desc['tax_id'])
self.last_tax.build_hierarchy()
......
......@@ -9,8 +9,9 @@ from metagenedb.apps.catalog.models import (
Gene, Function, Taxonomy
)
from metagenedb.apps.catalog.management.commands.compute_stats import (
ComputeStatistics, ComputeCounts, ComputeGeneLength, ComputeTaxonomyRepartition, ComputeTaxonomyPresence
ComputeCounts, ComputeGeneLength, ComputeTaxonomyRepartition, ComputeTaxonomyPresence
)
from metagenedb.apps.catalog.management.commands.compute_stats import clean_db as clean_db_stats
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger()
......@@ -37,7 +38,7 @@ def create_genes_db():
def compute_stats():
ComputeStatistics('all').clean_db()
clean_db_stats()
for gene_source in ['all', 'virgo', 'igc']:
ComputeCounts(gene_source).all()
ComputeGeneLength(gene_source).all()
......
# Generated by Django 3.1 on 2020-09-16 14:41
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('catalog', '0026_meta_statistics'),
]
operations = [
migrations.AlterField(
model_name='statistics',
name='body',
field=models.JSONField(),
),
migrations.AlterField(
model_name='taxonomy',
name='hierarchy',
field=models.JSONField(null=True),
),
]
from itertools import repeat
from django.db import models
from .function import Function
......@@ -12,6 +14,11 @@ class Gene(models.Model):
(IGC, 'IGC'),
(VIRGO, 'Virgo'),
]
CSV_HEADER = ','.join([
'gene_id', 'gene_name', 'gene_source', 'length',
'tax_id', 'tax_name', 'tax_rank', 'tax_full',
'kegg_id', 'eggnog_id',
])
gene_id = models.SlugField(max_length=100, db_index=True, unique=True)
name = models.CharField(max_length=100, unique=True)
......@@ -25,13 +32,54 @@ class Gene(models.Model):
)
source = models.CharField(max_length=10, choices=SOURCE_CHOICES, default=UNDEFINED)
def __str__(self):
def __str__(self) -> str:
return self.gene_id
@property
def fasta(self):
def fasta(self) -> str:
return f">{self.gene_id}\n{self.sequence}\n"
@property
def csv_header(self) -> str:
return self.CSV_HEADER
@property
def csv_gene(self) -> str:
return ",".join([
self.name, self.source, str(self.length),
])
@property
def csv_tax(self) -> str:
if self.taxonomy is None:
return ",".join(list(repeat('', 4)))
else:
return self.taxonomy.csv
@property
def csv_functions(self) -> str:
if not self.functions.all():
function_list = list(repeat('', 2))
else:
function_ids = {
'kegg': [],
'eggnog': []
}
for function in self.functions.all():
function_ids.get(function.source).append(function.function_id)
function_list = [
';'.join(function_ids['kegg']),
';'.join(function_ids['eggnog'])
]
return ",".join(function_list)
@property
def csv(self) -> str:
return ",".join(
[self.gene_id, self.csv_gene, self.csv_tax, self.csv_functions]
)
class Meta:
ordering = ['-gene_id']
......
from django.db import models
from django.contrib.postgres.fields import JSONField
class Statistics(models.Model):
......@@ -8,7 +7,7 @@ class Statistics(models.Model):
"""
stats_id = models.SlugField(max_length=400, db_index=True, unique=True)
body = JSONField()
body = models.JSONField()
class Meta:
verbose_name_plural = "Statistics"
from django.db import models
from django.contrib.postgres.fields import JSONField
class Taxonomy(models.Model):
......@@ -43,6 +42,9 @@ class Taxonomy(models.Model):
('varietas', 'Varietas'),
('species_group', 'Species group'),
]
CSV_HEADER = ','.join([
'tax_id', 'tax_name', 'tax_rank', 'tax_full',
])
tax_id = models.CharField(max_length=20, unique=True, db_index=True)
name = models.CharField(max_length=200, default=NAME_DEFAULT)
......@@ -52,7 +54,7 @@ class Taxonomy(models.Model):
on_delete=models.SET_NULL,
null=True, blank=True,
)
hierarchy = JSONField(null=True)
hierarchy = models.JSONField(null=True)
def __str__(self):
return f"{self.name}"
......@@ -73,6 +75,40 @@ class Taxonomy(models.Model):
self.save()
return hierarchy
def _compute_one_line_detailed_taxonomy(self) -> str:
default_item = {
'name': ''
}
if self.hierarchy is None:
self.build_hierarchy()
if self.hierarchy.get('species', None) is None:
s = ''
else:
s = self.hierarchy.get('species')['name'].split()[-1]
return "k__{k}; p__{p}; c__{c}; o__{o}; f__{f}; g__{g}; s__{s}".format(
k=self.hierarchy.get('kingdom', self.hierarchy.get('superkingdom', default_item))['name'],
p=self.hierarchy.get('phylum', default_item)['name'],
c=self.hierarchy.get('class', default_item)['name'],
o=self.hierarchy.get('order', default_item)['name'],
f=self.hierarchy.get('family', default_item)['name'],
g=self.hierarchy.get('genus', default_item)['name'],
s=s
)
@property
def one_line_detailed_taxonomy(self) -> str:
if getattr(self, '_one_line_detailed_taxonomy', None) is None:
self._one_line_detailed_taxonomy = self._compute_one_line_detailed_taxonomy()
return self._one_line_detailed_taxonomy
@property
def csv_header(self) -> str:
return self.CSV_HEADER
@property
def csv(self) -> str:
return ','.join([self.tax_id, self.name, self.rank, self.one_line_detailed_taxonomy])
class Meta:
verbose_name_plural = "Taxonomy"
ordering = ['-tax_id']
from rest_framework.test import APITestCase
from metagenedb.apps.catalog.factory import (
GeneWithEggNOGFactory, GeneWithKeggFactory, GeneWithTaxonomyFactory
)
class TestGeneCSV(APITestCase):
@classmethod
def setUpTestData(cls):
"""
Build some test data for different tests
"""
cls.gene_eggnog = GeneWithEggNOGFactory.create()
cls.gene_kegg = GeneWithKeggFactory()
cls.gene_tax = GeneWithTaxonomyFactory()
def test_csv_header(self):
expected_header = 'gene_id,gene_name,gene_source,length,tax_id,tax_name,tax_rank,tax_full,kegg_id,eggnog_id'
self.assertEqual(self.gene_tax.csv_header, expected_header)
def test_csv_gene(self):
expected = f"{self.gene_tax.name},{self.gene_tax.source},{self.gene_tax.length}"
self.assertEqual(self.gene_tax.csv_gene, expected)
def test_csv_tax(self):
expected = (
f"{self.gene_tax.taxonomy.tax_id},{self.gene_tax.taxonomy.name},"
f"{self.gene_tax.taxonomy.rank},{self.gene_tax.taxonomy.one_line_detailed_taxonomy}"
)
self.assertEqual(self.gene_tax.csv_tax, expected)
def test_csv_tax_empty(self):
expected = ",,,"
self.assertEqual(self.gene_kegg.csv_tax, expected)
self.assertEqual(self.gene_eggnog.csv_tax, expected)
def test_csv_functions(self):
expected = f"{self.gene_kegg.functions.all()[0].function_id},"
self.assertEqual(self.gene_kegg.csv_functions, expected)
expected = f",{self.gene_eggnog.functions.all()[0].function_id}"
self.assertEqual(self.gene_eggnog.csv_functions, expected)
def test_csv_functions_empty(self):
expected = ","
self.assertEqual(self.gene_tax.csv_functions, expected)
def test_csv(self):
expected = (
f"{self.gene_tax.gene_id},{self.gene_tax.name},{self.gene_tax.source},{self.gene_tax.length},"
f"{self.gene_tax.taxonomy.tax_id},{self.gene_tax.taxonomy.name},"
f"{self.gene_tax.taxonomy.rank},{self.gene_tax.taxonomy.one_line_detailed_taxonomy},"
f","
)
self.assertEqual(self.gene_tax.csv, expected)
......@@ -27,6 +27,12 @@ class TestBuildHierarchy(APITestCase):
rank="phylum",
parent=cls.kingdom
)
cls.species = TaxonomyFactory(
tax_id="4",
name="Genus Species",
rank="species",
parent=cls.phylum
)
def test_build_hierarchy(self):
expected_dict = {
......@@ -44,3 +50,20 @@ class TestBuildHierarchy(APITestCase):
self.assertDictEqual(test_dict, expected_dict)
self.assertIsNotNone(getattr(self.phylum, 'hierarchy'))
self.assertDictEqual(getattr(self.phylum, 'hierarchy'), expected_dict)
def test_compute_one_line_detailed_taxonomy(self):
expected_str = "k__; p__; c__; o__; f__; g__; s__"
self.assertEqual(self.root.one_line_detailed_taxonomy, expected_str)
expected_str = "k__KINGDOM; p__; c__; o__; f__; g__; s__"
self.assertEqual(self.kingdom.one_line_detailed_taxonomy, expected_str)
expected_str = "k__KINGDOM; p__PHYLUM; c__; o__; f__; g__; s__"
self.assertEqual(self.phylum.one_line_detailed_taxonomy, expected_str)
expected_str = "k__KINGDOM; p__PHYLUM; c__; o__; f__; g__; s__Species"
self.assertEqual(self.species.one_line_detailed_taxonomy, expected_str)
def test_csv(self):
expected = (
f"{self.species.tax_id},{self.species.name},"
f"{self.species.rank},{self.species.one_line_detailed_taxonomy}"
)
self.assertEqual(self.species.csv, expected)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment