Commit 220bb356 authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

refactor handling of functions

parent 415c1eb0
Pipeline #29771 passed with stages
in 3 minutes and 19 seconds
import logging
from slugify import slugify
from metagenedb.apps.catalog.models import Function, Gene, GeneFunction
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class HandleFunctions:
def _build_function_mapping(self, source):
logger.info("Building local mapping for %s function...", source)
instances = Function.objects.filter(source=source)
return {instance.function_id: instance for instance in instances}
@property
def eggnog_mapping(self):
if getattr(self, '_eggnog_mapping', None) is None:
self._eggnog_mapping = self._build_function_mapping("eggnog")
return self._eggnog_mapping
@property
def kegg_mapping(self):
if getattr(self, '_kegg_mapping', None) is None:
self._kegg_mapping = self._build_function_mapping("kegg")
return self._kegg_mapping
def _clean_functions(self, functions, unknown_val='unknown'):
"""
Get rid of functions that are not in the db or entitled unknown
"""
cleaned_functions = {}
for gene_id, all_functions in functions.items():
new_functions = []
for kegg in all_functions['kegg']:
if kegg == unknown_val:
continue
elif kegg in self.kegg_mapping.keys():
new_functions.append(self.kegg_mapping[kegg])
for eggnog in all_functions['eggnog']:
if eggnog == unknown_val:
continue
elif eggnog in self.eggnog_mapping.keys():
new_functions.append(self.eggnog_mapping[eggnog])
if new_functions:
cleaned_functions[gene_id] = new_functions
return cleaned_functions
def _remove_functions(self, gene_dicts):
functions = {}
for gene_dict in gene_dicts:
functions[slugify(gene_dict['gene_id'])] = {
'kegg': gene_dict.pop('kegg_ko'),
'eggnog': gene_dict.pop('eggnog')
}
return functions
def _generate_gene_function_mapping(self, functions, genes):
"""
Generate a list of GeneFunction pair to create relation between them
"""
mapping = []
for gene_id, function_list in functions.items():
for function in function_list:
mapping.append(GeneFunction(gene=genes[gene_id], function=function))
return mapping
def link_genes_to_functions(self, functions):
cleaned_functions = self._clean_functions(functions)
genes = Gene.objects.in_bulk(cleaned_functions.keys(), field_name='gene_id')
# Get all link with corresponding genes & Delete them
GeneFunction.objects.filter(gene__in=genes.values()).delete()
# Generate table for bulk_create of function <-> gene and create it
GeneFunction.objects.bulk_create(
self._generate_gene_function_mapping(cleaned_functions, genes)
)
from unittest import TestCase
from rest_framework.test import APITestCase
from metagenedb.apps.catalog.models import GeneFunction
from metagenedb.apps.catalog.management.commands.commons.handle_functions import HandleFunctions
from metagenedb.apps.catalog.factory import (
FunctionFactory,
GeneFactory,
)
class BaseTestHandleFunctions(TestCase):
def setUp(self):
self.handle_functions = HandleFunctions()
class TestRemoveFunctions(BaseTestHandleFunctions):
def test_remove_functions(self):
input_dicts = [{
'gene_id': 'Test_gene',
'kegg_ko': ['K0001'],
'eggnog': ['COG1', 'COG2']
}]
expected_functions = {
'test-gene': {
'kegg': ['K0001'],
'eggnog': ['COG1', 'COG2']
}
}
tested_dict = self.handle_functions._remove_functions(input_dicts)
self.assertDictEqual(tested_dict, expected_functions)
class TestCleanFunctions(APITestCase, BaseTestHandleFunctions):
@classmethod
def setUpTestData(cls):
cls.kegg = FunctionFactory(source='kegg')
cls.eggnog = FunctionFactory(source='eggnog')
def test_clean_functions_kegg_only(self):
functions = {
'gene-kegg': {
'kegg': [self.kegg.function_id, 'KO12345'],
'eggnog': ['unknown']
},
}
expected_functions = {
'gene-kegg': [self.kegg]
}
self.assertDictEqual(self.handle_functions._clean_functions(functions), expected_functions)
def test_clean_functions_eggnog_only(self):
functions = {
'gene-kegg': {
'kegg': ['unknown'],
'eggnog': [self.eggnog.function_id, 'COG12345']
},
}
expected_functions = {
'gene-kegg': [self.eggnog]
}
self.assertDictEqual(self.handle_functions._clean_functions(functions), expected_functions)
def test_clean_functions_kegg_eggnog(self):
functions = {
'gene-kegg': {
'kegg': [self.kegg.function_id, 'KO12345'],
'eggnog': [self.eggnog.function_id, 'COG12345']
},
}
expected_functions = {
'gene-kegg': [self.kegg, self.eggnog]
}
self.assertDictEqual(self.handle_functions._clean_functions(functions), expected_functions)
def test_clean_functions_both_unknown(self):
functions = {
'gene-kegg': {
'kegg': ['unknown'],
'eggnog': ['unknown']
},
}
expected_functions = {}
self.assertDictEqual(self.handle_functions._clean_functions(functions), expected_functions)
class TestLinkGenesToFunctions(APITestCase, BaseTestHandleFunctions):
@classmethod
def setUpTestData(cls):
cls.kegg = FunctionFactory(source='kegg')
cls.eggnog = FunctionFactory(source='eggnog')
cls.gene = GeneFactory()
def test_link_kegg_and_eggnog(self):
self.assertEqual(GeneFunction.objects.all().count(), 0)
functions = {
self.gene.gene_id: {
'kegg': [self.kegg.function_id],
'eggnog': [self.eggnog.function_id]
}
}
self.handle_functions.link_genes_to_functions(functions)
gene_functions = GeneFunction.objects.all()
self.assertEqual(gene_functions.count(), 2)
for link in gene_functions:
self.assertEqual(link.gene.gene_id, self.gene.gene_id)
......@@ -3,15 +3,16 @@ import logging
from django.core.management.base import BaseCommand
from slugify import slugify
from metagenedb.apps.catalog.management.commands.commons.handle_functions import HandleFunctions
from metagenedb.apps.catalog.management.commands.commons.import_genes import BaseImportGenes
from metagenedb.apps.catalog.models import Function, Gene, GeneFunction, Taxonomy
from metagenedb.apps.catalog.models import Taxonomy
from metagenedb.common.utils.parsers import IGCLineParser
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class ImportIGCGenes(BaseImportGenes):
class ImportIGCGenes(BaseImportGenes, HandleFunctions):
PHYLUM_COL = 'taxo_phylum'
GENUS_COL = 'taxo_genus'
SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', 'eggnog', PHYLUM_COL, GENUS_COL]
......@@ -43,23 +44,6 @@ class ImportIGCGenes(BaseImportGenes):
self._genus_mapping = self._build_taxo_mapping("genus")
return self._genus_mapping
def _build_function_mapping(self, source):
logger.info("Building local mapping for %s function...", source)
instances = Function.objects.filter(source=source)
return {instance.function_id: instance for instance in instances}
@property
def eggnog_mapping(self):
if getattr(self, '_eggnog_mapping', None) is None:
self._eggnog_mapping = self._build_function_mapping("eggnog")
return self._eggnog_mapping
@property
def kegg_mapping(self):
if getattr(self, '_kegg_mapping', None) is None:
self._kegg_mapping = self._build_function_mapping("kegg")
return self._kegg_mapping
def _retrieve_taxonomy(self, genus_name, phylum_name, unknown_val='unknown'):
taxonomy_instance = None
if genus_name != unknown_val:
......@@ -68,62 +52,12 @@ class ImportIGCGenes(BaseImportGenes):
taxonomy_instance = self.phylum_mapping.get(phylum_name, None)
return taxonomy_instance
def _remove_functions(self, gene_dicts):
functions = {}
for gene_dict in gene_dicts:
functions[slugify(gene_dict['gene_id'])] = {
'kegg': gene_dict.pop('kegg_ko'),
'eggnog': gene_dict.pop('eggnog')
}
return functions
def _format_for_model(self, igc_dict):
gene_dict = super()._format_for_model(igc_dict)
if not self.skip_tax:
gene_dict['taxonomy'] = self._retrieve_taxonomy(igc_dict.get('taxo_genus'), igc_dict.get('taxo_phylum'))
return gene_dict
def _clean_functions(self, functions, unknown_val='unknown'):
"""
Get rid of functions that are not in the db or entitled unknown
"""
cleaned_functions = {}
for gene_id, all_functions in functions.items():
new_functions = []
for kegg in all_functions['kegg']:
if kegg == unknown_val:
continue
elif kegg in self.kegg_mapping.keys():
new_functions.append(self.kegg_mapping[kegg])
for eggnog in all_functions['eggnog']:
if eggnog == unknown_val:
continue
elif eggnog in self.eggnog_mapping.keys():
new_functions.append(self.eggnog_mapping[eggnog])
if new_functions:
cleaned_functions[gene_id] = new_functions
return cleaned_functions
def _generate_gene_function_mapping(self, functions, genes):
"""
Generate a list of GeneFunction pair to create relation between them
"""
mapping = []
for gene_id, function_list in functions.items():
for function in function_list:
mapping.append(GeneFunction(gene=genes[gene_id], function=function))
return mapping
def link_genes_to_functions(self, functions):
cleaned_functions = self._clean_functions(functions)
genes = Gene.objects.in_bulk(cleaned_functions.keys(), field_name='gene_id')
# Get all link with corresponding genes & Delete them
GeneFunction.objects.filter(gene__in=genes.values()).delete()
# Generate table for bulk_create of function <-> gene and create it
GeneFunction.objects.bulk_create(
self._generate_gene_function_mapping(cleaned_functions, genes)
)
def _handle_chunk(self, chunk_genes):
gene_dict_list = [self._parse_gene(i) for i in chunk_genes]
functions = self._remove_functions(gene_dict_list)
......
......@@ -3,15 +3,15 @@ import logging
from django.core.management.base import BaseCommand
from slugify import slugify
from metagenedb.apps.catalog.management.commands.commons.handle_functions import HandleFunctions
from metagenedb.apps.catalog.management.commands.commons.import_genes import BaseImportGenes
from metagenedb.apps.catalog.models import Function, Gene, GeneFunction
from metagenedb.common.utils.parsers import VirgoKEGGLineParser
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class ImportVirgoGeneKeggAnnotation(BaseImportGenes):
class ImportVirgoGeneKeggAnnotation(BaseImportGenes, HandleFunctions):
IMPORT_TYPE = "Virgo KEGG annotations" # For logs
SELECTED_KEYS = ['gene_id', 'kegg_ko']
......@@ -19,27 +19,6 @@ class ImportVirgoGeneKeggAnnotation(BaseImportGenes):
SOURCE = 'virgo'
PARSER = VirgoKEGGLineParser
def _build_function_mapping(self, source):
logger.info("Building local mapping for %s functions...", source)
instances = Function.objects.filter(source=source)
return {instance.function_id: instance for instance in instances}
@property
def kegg_mapping(self):
if getattr(self, '_kegg_mapping', None) is None:
self._kegg_mapping = self._build_function_mapping("kegg")
return self._kegg_mapping
def _generate_gene_function_mapping(self, functions, genes):
"""
Generate a list of GeneFunction pair to create relation between them
"""
mapping = []
for gene_id, function_list in functions.items():
for function in function_list:
mapping.append(GeneFunction(gene=genes[gene_id], function=function))
return mapping
def _clean_functions(self, functions, unknown_val='unknown'):
"""
Get rid of functions that are not in the db or entitled unknown
......@@ -56,16 +35,6 @@ class ImportVirgoGeneKeggAnnotation(BaseImportGenes):
cleaned_functions[gene_id] = new_functions
return cleaned_functions
def link_genes_to_functions(self, functions):
cleaned_functions = self._clean_functions(functions)
genes = Gene.objects.in_bulk(cleaned_functions.keys(), field_name='gene_id')
# Get all link with corresponding genes & Delete them
GeneFunction.objects.filter(gene__in=genes.values()).delete()
# Generate table for bulk_create of function <-> gene and create it
GeneFunction.objects.bulk_create(
self._generate_gene_function_mapping(cleaned_functions, genes)
)
def _remove_functions(self, gene_dicts):
functions = {}
for gene_dict in gene_dicts:
......
......@@ -4,11 +4,9 @@ from unittest import TestCase
import mock
from rest_framework.test import APITestCase
from metagenedb.apps.catalog.models import Gene, GeneFunction
from metagenedb.apps.catalog.models import Gene
from metagenedb.apps.catalog.management.commands.import_igc_annotation import ImportIGCGenes
from metagenedb.apps.catalog.factory import (
FunctionFactory,
GeneFactory,
TaxonomyFactory,
)
from metagenedb.apps.catalog.factory.function import generate_fake_functions_db
......@@ -60,101 +58,6 @@ class TestRetrieveTaxonomy(APITestCase, BaseTestImportIGCGenes):
self.assertEqual(tested_taxonomy, None)
class TestRemoveFunctions(BaseTestImportIGCGenes):
def test_remove_functions(self):
input_dicts = [{
'gene_id': 'Test_gene',
'kegg_ko': ['K0001'],
'eggnog': ['COG1', 'COG2']
}]
expected_functions = {
'test-gene': {
'kegg': ['K0001'],
'eggnog': ['COG1', 'COG2']
}
}
tested_dict = self.import_igc_genes._remove_functions(input_dicts)
self.assertDictEqual(tested_dict, expected_functions)
class TestRemoveUnknownFunctions(APITestCase, BaseTestImportIGCGenes):
@classmethod
def setUpTestData(cls):
cls.kegg = FunctionFactory(source='kegg')
cls.eggnog = FunctionFactory(source='eggnog')
def test_clean_functions_kegg_only(self):
functions = {
'gene-kegg': {
'kegg': [self.kegg.function_id, 'KO12345'],
'eggnog': ['unknown']
},
}
expected_functions = {
'gene-kegg': [self.kegg]
}
self.assertDictEqual(self.import_igc_genes._clean_functions(functions), expected_functions)
def test_clean_functions_eggnog_only(self):
functions = {
'gene-kegg': {
'kegg': ['unknown'],
'eggnog': [self.eggnog.function_id, 'COG12345']
},
}
expected_functions = {
'gene-kegg': [self.eggnog]
}
self.assertDictEqual(self.import_igc_genes._clean_functions(functions), expected_functions)
def test_clean_functions_kegg_eggnog(self):
functions = {
'gene-kegg': {
'kegg': [self.kegg.function_id, 'KO12345'],
'eggnog': [self.eggnog.function_id, 'COG12345']
},
}
expected_functions = {
'gene-kegg': [self.kegg, self.eggnog]
}
self.assertDictEqual(self.import_igc_genes._clean_functions(functions), expected_functions)
def test_clean_functions_both_unknown(self):
functions = {
'gene-kegg': {
'kegg': ['unknown'],
'eggnog': ['unknown']
},
}
expected_functions = {}
self.assertDictEqual(self.import_igc_genes._clean_functions(functions), expected_functions)
class TestLinkGenesToFunctions(APITestCase, BaseTestImportIGCGenes):
@classmethod
def setUpTestData(cls):
cls.kegg = FunctionFactory(source='kegg')
cls.eggnog = FunctionFactory(source='eggnog')
cls.gene = GeneFactory()
def test_link_kegg_and_eggnog(self):
self.assertEqual(GeneFunction.objects.all().count(), 0)
functions = {
self.gene.gene_id: {
'kegg': [self.kegg.function_id],
'eggnog': [self.eggnog.function_id]
}
}
self.import_igc_genes.link_genes_to_functions(functions)
gene_functions = GeneFunction.objects.all()
self.assertEqual(gene_functions.count(), 2)
for link in gene_functions:
self.assertEqual(link.gene.gene_id, self.gene.gene_id)
class TestEndToEnd(APITestCase):
@classmethod
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment