Commit e346db16 authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

add script to add EggNOG annotations from VIRGO

parent 534effc0
Pipeline #29868 passed with stages
in 3 minutes and 17 seconds
import logging
from django.core.management.base import BaseCommand
from slugify import slugify
from metagenedb.apps.catalog.management.commands.commons.handle_functions import HandleFunctions
from metagenedb.apps.catalog.management.commands.commons.import_genes import BaseImportGenes
from metagenedb.common.utils.parsers import VirgoEggNOGLineParser
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class ImportVirgoGeneEggNOGAnnotation(BaseImportGenes, HandleFunctions):
IMPORT_TYPE = "Virgo EggNOG annotations" # For logs
SELECTED_KEYS = ['gene_id', 'eggnog']
UPDATED_FIELDS = ['name']
SOURCE = 'virgo'
PARSER = VirgoEggNOGLineParser
def _clean_functions(self, functions, unknown_val='unknown'):
"""
Get rid of functions that are not in the db or entitled unknown
"""
cleaned_functions = {}
for gene_id, all_functions in functions.items():
new_functions = []
eggnog_annotation = all_functions['eggnog']
if eggnog_annotation == unknown_val:
continue
elif eggnog_annotation in self.eggnog_mapping.keys():
new_functions.append(self.eggnog_mapping[eggnog_annotation])
if new_functions:
cleaned_functions[gene_id] = new_functions
return cleaned_functions
def _remove_functions(self, gene_dicts):
functions = {}
for gene_dict in gene_dicts:
functions[slugify(gene_dict['gene_id'])] = {
'eggnog': gene_dict.pop('eggnog'),
}
return functions
def _format_for_model(self, igc_dict):
"""
@TODO remove in the future and makes function from parent class more modulable
"""
gene_dict = {}
gene_dict['gene_id'] = slugify(igc_dict['gene_id'])
gene_dict['name'] = igc_dict['gene_id']
gene_dict['source'] = self.SOURCE
return gene_dict
def _handle_chunk(self, chunk_genes):
"""
Overide for all different sources
"""
gene_dict_list = [self._parse_gene(i) for i in chunk_genes]
functions = self._remove_functions(gene_dict_list)
gene_clean_dict = {slugify(i['gene_id']): self._format_for_model(i) for i in gene_dict_list}
self.create_or_update_genes(gene_clean_dict)
self.link_genes_to_functions(functions)
class Command(BaseCommand):
help = 'Create or update all EggNOG annotation for Virgo genes (from `3.eggnog.NOG.txt` file).'
def add_arguments(self, parser):
parser.add_argument(
'annotation',
help='3.eggnog.NOG.txt file from Virgo. Genes need to exist in DB for this script to work.'
)
parser.add_argument('--test', action='store_true', help='Run only on first 10000 entries.')
def set_logger_level(self, verbosity):
if verbosity > 2:
logger.setLevel(logging.DEBUG)
elif verbosity > 1:
logger.setLevel(logging.INFO)
def handle(self, *args, **options):
self.set_logger_level(int(options['verbosity']))
import_annotations = ImportVirgoGeneEggNOGAnnotation(options['annotation'])
import_annotations.load_all(test=options['test'])
Cluster_566081 V1 RPSI map03010 J 30S ribosomal protein S9 COG1234
Cluster_308979 V2 TRUA J Formation of pseudouridine at positions 38, 39 and 40 in the anticodon stem and loop of transfer RNAs (By similarity) COG5678
import os
from rest_framework.test import APITestCase
from metagenedb.apps.catalog.models import Gene
from metagenedb.apps.catalog.management.commands.import_virgo_eggnog import ImportVirgoGeneEggNOGAnnotation
from metagenedb.apps.catalog.factory import GeneFactory
from metagenedb.apps.catalog.factory.function import generate_fake_functions_db
class TestEndToEnd(APITestCase):
@classmethod
def setUpTestData(cls):
generate_fake_functions_db()
GeneFactory.create(gene_id="v1")
GeneFactory.create(gene_id="v2")
def test_end_to_end(self):
test_file = os.path.join(os.path.dirname(__file__), "./test_files/virgo_eggnog.tsv")
loader = ImportVirgoGeneEggNOGAnnotation(test_file)
expected_genes = {
'v1': {
'name': 'V1',
'functions': {
'eggnog': 'COG1234',
}
},
'v2': {
'name': 'V2',
'functions': {
'eggnog': 'COG5678',
}
},
}
loader.load_all()
created_genes = Gene.objects.all().prefetch_related('functions')
for created_gene in created_genes:
self.assertEqual(getattr(created_gene, 'name'), expected_genes[created_gene.gene_id]['name'])
# Check functions
self.assertTrue(created_gene.functions.all())
for function in created_gene.functions.all():
self.assertIn(function.source, ['kegg', 'eggnog'])
self.assertEqual(
function.function_id, expected_genes[created_gene.gene_id]['functions'][function.source]
)
......@@ -38,6 +38,7 @@ class TestEndToEnd(APITestCase):
for created_gene in created_genes:
self.assertEqual(getattr(created_gene, 'name'), expected_genes[created_gene.gene_id]['name'])
# Check functions
self.assertTrue(created_gene.functions.all())
for function in created_gene.functions.all():
self.assertIn(function.source, ['kegg', 'eggnog'])
self.assertEqual(
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment