Commit a6bc8b0f authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

refactor taxonomy handling when importing genes

parent 657d1d9e
Pipeline #30255 passed with stages
in 3 minutes and 48 seconds
import logging
from metagenedb.apps.catalog.models import Taxonomy
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class HandleTaxonomy:
def _build_taxo_mapping(self, rank):
logger.info("Building local mapping for %s level...", rank)
instances = Taxonomy.objects.filter(rank=rank)
return {instance.name: instance for instance in instances}
@property
def phylum_mapping(self):
if getattr(self, '_phylum_mapping', None) is None:
self._phylum_mapping = self._build_taxo_mapping("phylum")
return self._phylum_mapping
@property
def genus_mapping(self):
if getattr(self, '_genus_mapping', None) is None:
self._genus_mapping = self._build_taxo_mapping("genus")
return self._genus_mapping
@property
def species_mapping(self):
if getattr(self, '_species_mapping', None) is None:
self._species_mapping = self._build_taxo_mapping("species")
return self._species_mapping
def _retrieve_taxonomy(self, name, rank='species', unknown_val='unknown'):
taxonomy_instance = None
if name != unknown_val:
taxonomy_instance = getattr(self, f"{rank}_mapping", {}).get(name, None)
return taxonomy_instance
from rest_framework.test import APITestCase
from metagenedb.apps.catalog.factory import (
TaxonomyFactory,
)
from metagenedb.apps.catalog.management.commands.commons.handle_taxonomy import HandleTaxonomy
class TestRetrieveTaxonomy(APITestCase):
@classmethod
def setUpTestData(cls):
cls.genus = TaxonomyFactory(rank='genus')
cls.phylum = TaxonomyFactory(rank='phylum')
def setUp(self):
self.unknown = 'unknown'
self.handle_taxonomy = HandleTaxonomy()
def test_genus_only(self):
tested_taxonomy = self.handle_taxonomy._retrieve_taxonomy(
self.genus.name, rank='genus', unknown_val=self.unknown
)
self.assertEqual(tested_taxonomy.tax_id, self.genus.tax_id)
def test_genus_not_in_db(self):
tested_taxonomy = self.handle_taxonomy._retrieve_taxonomy("Fake Name", rank="genus", unknown_val=self.unknown)
self.assertEqual(tested_taxonomy, None)
def test_phylum_only(self):
tested_taxonomy = self.handle_taxonomy._retrieve_taxonomy(
self.phylum.name, rank="phylum", unknown_val=self.unknown
)
self.assertEqual(tested_taxonomy.tax_id, self.phylum.tax_id)
def test_phylum_not_in_db(self):
tested_taxonomy = self.handle_taxonomy._retrieve_taxonomy(self.unknown, "Fake Name")
self.assertEqual(tested_taxonomy, None)
def test_both_unknown(self):
tested_taxonomy = self.handle_taxonomy._retrieve_taxonomy(self.unknown)
self.assertEqual(tested_taxonomy, None)
......@@ -4,15 +4,15 @@ from django.core.management.base import BaseCommand
from slugify import slugify
from metagenedb.apps.catalog.management.commands.commons.handle_functions import HandleFunctions
from metagenedb.apps.catalog.management.commands.commons.handle_taxonomy import HandleTaxonomy
from metagenedb.apps.catalog.management.commands.commons.import_genes import BaseImportGenes
from metagenedb.apps.catalog.models import Taxonomy
from metagenedb.common.utils.parsers import IGCLineParser
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class ImportIGCGenes(BaseImportGenes, HandleFunctions):
class ImportIGCGenes(BaseImportGenes, HandleFunctions, HandleTaxonomy):
PHYLUM_COL = 'taxo_phylum'
GENUS_COL = 'taxo_genus'
SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', 'eggnog', PHYLUM_COL, GENUS_COL]
......@@ -27,35 +27,13 @@ class ImportIGCGenes(BaseImportGenes, HandleFunctions):
self.skip_tax = skip_tax
self.skip_functions = skip_functions
def _build_taxo_mapping(self, rank):
logger.info("Building local mapping for %s level...", rank)
instances = Taxonomy.objects.filter(rank=rank)
return {instance.name: instance for instance in instances}
@property
def phylum_mapping(self):
if getattr(self, '_phylum_mapping', None) is None:
self._phylum_mapping = self._build_taxo_mapping("phylum")
return self._phylum_mapping
@property
def genus_mapping(self):
if getattr(self, '_genus_mapping', None) is None:
self._genus_mapping = self._build_taxo_mapping("genus")
return self._genus_mapping
def _retrieve_taxonomy(self, genus_name, phylum_name, unknown_val='unknown'):
taxonomy_instance = None
if genus_name != unknown_val:
taxonomy_instance = self.genus_mapping.get(genus_name, None)
if taxonomy_instance is None and phylum_name != unknown_val:
taxonomy_instance = self.phylum_mapping.get(phylum_name, None)
return taxonomy_instance
def _format_for_model(self, igc_dict):
gene_dict = super()._format_for_model(igc_dict)
if not self.skip_tax:
gene_dict['taxonomy'] = self._retrieve_taxonomy(igc_dict.get('taxo_genus'), igc_dict.get('taxo_phylum'))
taxonomy = self._retrieve_taxonomy(igc_dict.get('taxo_genus'), rank="genus")
if taxonomy is None:
taxonomy = self._retrieve_taxonomy(igc_dict.get('taxo_phylum'), rank="phylum")
gene_dict['taxonomy'] = taxonomy
return gene_dict
def _handle_chunk(self, chunk_genes):
......
import os
from unittest import TestCase
import mock
from rest_framework.test import APITestCase
from metagenedb.apps.catalog.models import Gene
from metagenedb.apps.catalog.management.commands.import_igc_annotation import ImportIGCGenes
from metagenedb.apps.catalog.factory import (
TaxonomyFactory,
)
from metagenedb.apps.catalog.factory.function import generate_fake_functions_db
from metagenedb.apps.catalog.factory.taxonomy import generate_simple_db
class BaseTestImportIGCGenes(TestCase):
def setUp(self):
function_to_mock = 'metagenedb.apps.catalog.management.commands.commons.import_genes.file_len'
with mock.patch(function_to_mock) as MockFileLen:
MockFileLen.return_value = 10
self.import_igc_genes = ImportIGCGenes('test')
class TestRetrieveTaxonomy(APITestCase, BaseTestImportIGCGenes):
@classmethod
def setUpTestData(cls):
cls.genus = TaxonomyFactory(rank='genus')
cls.phylum = TaxonomyFactory(rank='phylum')
def setUp(self):
self.unknown = 'unknown'
super().setUp()
def test_genus_only(self):
tested_taxonomy = self.import_igc_genes._retrieve_taxonomy(self.genus.name, self.unknown)
self.assertEqual(tested_taxonomy.tax_id, self.genus.tax_id)
def test_genus_not_in_db(self):
tested_taxonomy = self.import_igc_genes._retrieve_taxonomy("Fake Name", self.unknown)
self.assertEqual(tested_taxonomy, None)
def test_phylum_only(self):
tested_taxonomy = self.import_igc_genes._retrieve_taxonomy(self.unknown, self.phylum.name)
self.assertEqual(tested_taxonomy.tax_id, self.phylum.tax_id)
def test_phylum_not_in_db(self):
tested_taxonomy = self.import_igc_genes._retrieve_taxonomy(self.unknown, "Fake Name")
self.assertEqual(tested_taxonomy, None)
def test_genus_phylum(self):
tested_taxonomy = self.import_igc_genes._retrieve_taxonomy(self.genus.name, self.phylum.name)
self.assertEqual(tested_taxonomy.tax_id, self.genus.tax_id)
def test_both_unknown(self):
tested_taxonomy = self.import_igc_genes._retrieve_taxonomy(self.unknown, self.unknown)
self.assertEqual(tested_taxonomy, None)
class TestEndToEnd(APITestCase):
@classmethod
......@@ -83,7 +32,7 @@ class TestEndToEnd(APITestCase):
'source': 'igc',
'length': 456,
'name': 'Gene_2',
'tax_id': '1239',
'tax_id': '1239', # Genus annotation Veillonella not in test db, but phylum yes
'functions': {
'kegg': 'K67890',
'eggnog': 'COG5678'
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment