Commit 55f6ce95 authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

add taxonomy annotations for VIRGO genes

parent a6bc8b0f
Pipeline #30273 passed with stages
in 3 minutes and 22 seconds
......@@ -7,6 +7,7 @@ logger = logging.getLogger(__name__)
class HandleTaxonomy:
MANUAL_TAXO_MAPPING = {}
def _build_taxo_mapping(self, rank):
logger.info("Building local mapping for %s level...", rank)
......@@ -31,6 +32,18 @@ class HandleTaxonomy:
self._species_mapping = self._build_taxo_mapping("species")
return self._species_mapping
def _build_manual_mapping(self):
mapping = {}
for key, tax_id in self.MANUAL_TAXO_MAPPING.items():
mapping[key] = Taxonomy.objects.get(tax_id=tax_id)
return mapping
@property
def manual_mapping(self):
if getattr(self, '_manual_mapping', None) is None:
self._manual_mapping = self._build_manual_mapping()
return self._manual_mapping
def _retrieve_taxonomy(self, name, rank='species', unknown_val='unknown'):
taxonomy_instance = None
if name != unknown_val:
......
......@@ -34,11 +34,11 @@ class BaseImportGenes(object):
selected_dict = {k: v for k, v in all_dict.items() if k in self.SELECTED_KEYS}
return selected_dict
def _format_for_model(self, igc_dict):
def _format_for_model(self, ori_gene_dict):
gene_dict = {}
gene_dict['gene_id'] = slugify(igc_dict['gene_id'])
gene_dict['name'] = igc_dict['gene_id']
gene_dict['length'] = igc_dict['length']
gene_dict['gene_id'] = slugify(ori_gene_dict['gene_id'])
gene_dict['name'] = ori_gene_dict['gene_id']
gene_dict['length'] = ori_gene_dict['length']
gene_dict['source'] = self.SOURCE
return gene_dict
......
......@@ -41,3 +41,12 @@ class TestRetrieveTaxonomy(APITestCase):
def test_both_unknown(self):
tested_taxonomy = self.handle_taxonomy._retrieve_taxonomy(self.unknown)
self.assertEqual(tested_taxonomy, None)
def test_build_manual_mapping(self):
self.handle_taxonomy.MANUAL_TAXO_MAPPING = {
'test_manual': self.genus.tax_id
}
tested_taxonomy = self.handle_taxonomy._retrieve_taxonomy(
'test_manual', rank='manual', unknown_val=self.unknown
)
self.assertEqual(tested_taxonomy.tax_id, self.genus.tax_id)
......@@ -17,7 +17,7 @@ class ImportIGCGenes(BaseImportGenes, HandleFunctions, HandleTaxonomy):
GENUS_COL = 'taxo_genus'
SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', 'eggnog', PHYLUM_COL, GENUS_COL]
IMPORT_TYPE = "IGC genes" # For logs
UPDATED_FIELDS = ['length', 'name', 'source']
UPDATED_FIELDS = ['length', 'name', 'source', 'taxonomy']
SOURCE = 'igc'
PARSER = IGCLineParser
......
......@@ -45,13 +45,13 @@ class ImportVirgoGeneEggNOGAnnotation(BaseImportGenes, HandleFunctions):
}
return functions
def _format_for_model(self, igc_dict):
def _format_for_model(self, ori_gene_dict):
"""
@TODO remove in the future and makes function from parent class more modulable
"""
gene_dict = {}
gene_dict['gene_id'] = slugify(igc_dict['gene_id'])
gene_dict['name'] = igc_dict['gene_id']
gene_dict['gene_id'] = slugify(ori_gene_dict['gene_id'])
gene_dict['name'] = ori_gene_dict['gene_id']
gene_dict['source'] = self.SOURCE
return gene_dict
......
......@@ -45,13 +45,13 @@ class ImportVirgoGeneKeggAnnotation(BaseImportGenes, HandleFunctions):
}
return functions
def _format_for_model(self, igc_dict):
def _format_for_model(self, ori_gene_dict):
"""
@TODO remove in the future and makes function from parent class more modulable
"""
gene_dict = {}
gene_dict['gene_id'] = slugify(igc_dict['gene_id'])
gene_dict['name'] = igc_dict['gene_id']
gene_dict['gene_id'] = slugify(ori_gene_dict['gene_id'])
gene_dict['name'] = ori_gene_dict['gene_id']
gene_dict['source'] = self.SOURCE
return gene_dict
......
import logging
from django.core.management.base import BaseCommand
from slugify import slugify
from metagenedb.apps.catalog.management.commands.commons.handle_taxonomy import HandleTaxonomy
from metagenedb.apps.catalog.management.commands.commons.import_genes import BaseImportGenes
from metagenedb.common.utils.parsers import VirgoTaxonomyLineParser
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class ImportVirgoGeneTaxonomyAnnotation(BaseImportGenes, HandleTaxonomy):
SELECTED_KEYS = ['gene_id', 'taxonomy']
IMPORT_TYPE = "Virgo taxonomy annotations" # For logs
UPDATED_FIELDS = ['name', 'taxonomy']
SOURCE = 'virgo'
PARSER = VirgoTaxonomyLineParser
MANUAL_TAXO_MAPPING = {
'BVAB1': '699240',
'Clostridiales Family': '186802',
'Chlamydophila psittaci': '83554'
}
def _format_for_model(self, ori_gene_dict):
"""
@TODO remove in the future and makes function from parent class more modulable
"""
gene_dict = {}
gene_dict['gene_id'] = slugify(ori_gene_dict['gene_id'])
gene_dict['name'] = ori_gene_dict['gene_id']
gene_dict['source'] = self.SOURCE
taxonomy_term = ori_gene_dict.get('taxonomy').replace('_', ' ')
taxonomy = self._retrieve_taxonomy(taxonomy_term, rank="species")
if taxonomy is None:
# Use manually created mapping dict
taxonomy = self._retrieve_taxonomy(taxonomy_term.split(' ')[0], rank="genus")
if taxonomy is None:
# Try to at least retrieve the genus from the first part of the taxonomy
taxonomy = self._retrieve_taxonomy(taxonomy_term, rank="genus")
# @TODO need to find a way of handling other cases
if taxonomy is None:
# Use manually created mapping dict
taxonomy = self._retrieve_taxonomy(taxonomy_term, rank="manual")
if taxonomy is None:
self.skipped_genes += 1
logger.warning("Could not retrieve %s for %s", ori_gene_dict.get('taxonomy'), ori_gene_dict['gene_id'])
gene_dict['taxonomy'] = taxonomy
return gene_dict
class Command(BaseCommand):
help = 'Create or update all Taxonomy annotations for Virgo genes (from `1.taxon.tbl.txt` file).'
def add_arguments(self, parser):
parser.add_argument(
'annotation',
help='1.taxon.tbl.txt file from Virgo. Genes need to exist in DB for this script to work.'
)
parser.add_argument('--test', action='store_true', help='Run only on first 10000 entries.')
def set_logger_level(self, verbosity):
if verbosity > 2:
logger.setLevel(logging.DEBUG)
elif verbosity > 1:
logger.setLevel(logging.INFO)
def handle(self, *args, **options):
self.set_logger_level(int(options['verbosity']))
import_annotations = ImportVirgoGeneTaxonomyAnnotation(options['annotation'])
import_annotations.load_all(test=options['test'])
Cluster_566081 V1 Escherichia_coli 396
Cluster_308979 V2 Lactobacillus_iners 783
import os
from rest_framework.test import APITestCase
from metagenedb.apps.catalog.models import Gene
from metagenedb.apps.catalog.management.commands.import_virgo_taxonomy import ImportVirgoGeneTaxonomyAnnotation
from metagenedb.apps.catalog.factory import GeneFactory
from metagenedb.apps.catalog.factory.taxonomy import generate_simple_db
class TestEndToEnd(APITestCase):
@classmethod
def setUpTestData(cls):
generate_simple_db()
for gene_id in ['v1', 'v2']:
GeneFactory.create(gene_id=gene_id)
def test_end_to_end(self):
test_file = os.path.join(os.path.dirname(__file__), "./test_files/virgo_taxonomy.tsv")
loader = ImportVirgoGeneTaxonomyAnnotation(test_file)
expected_genes = {
'v1': {
'name': 'V1',
'tax_id': '562',
},
'v2': {
'name': 'V2',
'tax_id': '1578',
}
}
loader.load_all()
created_genes = Gene.objects.all().prefetch_related('functions')
for created_gene in created_genes:
self.assertEqual(getattr(created_gene, 'name'), expected_genes[created_gene.gene_id]['name'])
self.assertEqual(created_gene.taxonomy.tax_id, expected_genes[created_gene.gene_id]['tax_id'])
......@@ -2,4 +2,6 @@ from .eggnog import EggNOGAnnotationLineParser # noqa
from .igc import IGCLineParser # noqa
from .kegg import KEGGLineParser # noqa
from .ncbi_taxonomy import NCBITaxonomyLineParser # noqa
from .virgo import VirgoGeneLengthLineParser, VirgoKEGGLineParser, VirgoEggNOGLineParser # noqa
from .virgo import ( # noqa
VirgoGeneLengthLineParser, VirgoKEGGLineParser, VirgoEggNOGLineParser, VirgoTaxonomyLineParser
)
from unittest import TestCase
from metagenedb.common.utils.parsers import (
VirgoGeneLengthLineParser, VirgoKEGGLineParser, VirgoEggNOGLineParser
VirgoGeneLengthLineParser, VirgoKEGGLineParser, VirgoEggNOGLineParser, VirgoTaxonomyLineParser
)
......@@ -103,3 +103,28 @@ class TestVirgoEggNOGLineParser(TestCase):
raw_line = "This is a wrong line format, with; information and tab"
with self.assertRaises(Exception) as context: # noqa
VirgoEggNOGLineParser.gene(raw_line)
class TestVirgoTaxonomyLineParser(TestCase):
def test_gene(self):
raw_data = [
'cluster_id',
'gene_id',
'taxonomy',
'1234',
]
raw_line = "\t".join(raw_data)
expected_dict = {
'cluster_id': raw_data[0],
'gene_id': raw_data[1],
'taxonomy': raw_data[2],
'length': raw_data[3],
}
test_dict = VirgoTaxonomyLineParser.gene(raw_line)
self.assertDictEqual(test_dict, expected_dict)
def test_gene_wrong_format(self):
raw_line = "This is a wrong line format, with; information and tab"
with self.assertRaises(Exception) as context: # noqa
VirgoEggNOGLineParser.gene(raw_line)
......@@ -79,5 +79,31 @@ class VirgoEggNOGLineParser(object):
'eggnog': gene_info[6],
}
except Exception:
_LOGGER.error(f"Could not parse: {line.rstrip()}. Are you sure it comes from Virgo KEGG annotation file?")
_LOGGER.error(f"Could not parse: {line.rstrip()}. Are you sure it comes from Virgo EggNOG annotation file?")
raise
class VirgoTaxonomyLineParser(object):
@staticmethod
def gene(line):
"""
Parse line from Virgo Taxonomy annotations to return organized dict (1.taxon.tbl.txt)
IGC annotation columns:
0: Cluster ID
1: Gene ID
2: Taxonomy annotation
3: Gene length
"""
try:
gene_info = line.rstrip().split('\t')
return {
'cluster_id': gene_info[0],
'gene_id': gene_info[1],
'taxonomy': gene_info[2],
'length': gene_info[3],
}
except Exception:
_LOGGER.error(f"Could not parse: {line.rstrip()}. Are you sure it comes from Virgo taxonomy file?")
raise
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment