Commit 2c1e3d6b authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

Add common class to perform insertion to DB

parent 61a3b8ad
Pipeline #13210 failed with stage
in 1 minute and 14 seconds
import logging
from rest_framework import status
from rest_framework.decorators import (
api_view,
......@@ -7,8 +9,28 @@ from rest_framework.decorators import (
from rest_framework.response import Response
from django.core.paginator import Paginator, EmptyPage, PageNotAnInteger
from metagenedb.apps.catalog.models import Gene
from metagenedb.apps.catalog.models import Function, Gene
from metagenedb.apps.catalog.serializers import GeneSerializer
from metagenedb.apps.catalog.views.insertion_model import InsertionBase
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
class GeneInsertion(InsertionBase):
MANY_TO_MANY_FIELDS = ['kegg_ko']
model = Gene
obj_id = "gene_id"
def _link_kegg_ko(self, function_id):
VALUE_TO_SKIP = ['unknown']
if function_id not in VALUE_TO_SKIP:
try:
function = Function.objects.get(function_id=function_id)
self.obj.functions.add(function)
self.full_clean_and_save()
except Function.DoesNotExist:
_LOGGER.warning(f"{function_id} not found in the database. Full dict: {self.full_dict}.")
@api_view(['GET'])
......
from abc import ABC
from metagenedb.utils.dict_operations import extract_dict
class InsertionBase(ABC):
"""
Base for insertion in DB for different models.
This base will be used for POST methods but also direct insertion to DB from scripts.
"""
MANY_TO_MANY_FIELDS = []
FOREIGN_KEY_FIELDS = []
@property
def model(self):
raise NotImplementedError
@property
def obj_id(self):
raise NotImplementedError
def __init__(self, model_dict):
self.full_dict = model_dict.copy()
self.foreign_key_dict = extract_dict(model_dict, self.FOREIGN_KEY_FIELDS)
self.many_to_many_dict = extract_dict(model_dict, self.MANY_TO_MANY_FIELDS)
self.simple_dict = model_dict.copy()
self.obj = None
def upsert_to_db(self):
try:
self.obj = self.model.objects.get(**{self.obj_id: self.full_dict.get(self.obj_id)})
for key, value in self.simple_dict.items():
setattr(self.obj, key, value)
except self.model.DoesNotExist:
self.create_obj()
self.full_clean_and_save()
self.handle_foreign_fields()
self.handle_many_to_many_fields()
def create_obj(self):
self.obj = self.model(**self.simple_dict)
def full_clean_and_save(self):
self.obj.full_clean()
self.obj.save()
def handle_foreign_fields(self):
for key, value in self.foreign_key_dict.items():
getattr(self, f"_link_{key}")(value)
def handle_many_to_many_fields(self):
for key, value in self.many_to_many_dict.items():
getattr(self, f"_link_{key}")(value)
......@@ -8,13 +8,12 @@ from itertools import islice
import django
from django.core.exceptions import ValidationError
from metagenedb.utils.dict_operations import extract_dict
# Before model import, we need to called django.setup() to Load apps
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "metagenedb.settings")
django.setup()
from metagenedb.apps.catalog.models import Gene, Function # noqa
from metagenedb.apps.catalog.views.gene import GeneInsertion # noqa
logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)
......@@ -47,38 +46,13 @@ def parse_gene(raw_line):
}
def link_to_function(obj_gene, gene_dict):
try:
function = Function.objects.get(function_id=gene_dict.get('kegg_ko'))
obj_gene.functions.add(function)
obj_gene.full_clean()
obj_gene.save()
except Function.DoesNotExist:
_LOGGER.warning(f"{gene_dict.get('kegg_ko')} not found in the database {gene_dict}.")
def insert_gene(gene_dict):
MANY_TO_MANY_FIELDS = ['kegg_ko']
many_to_many_elements = extract_dict(gene_dict, MANY_TO_MANY_FIELDS)
try:
obj_gene = Gene.objects.get(gene_id=gene_dict.get('gene_id'))
for key, value in gene_dict.items():
setattr(obj_gene, key, value)
except Gene.DoesNotExist:
obj_gene = Gene(gene_id=gene_dict.get('gene_id'),
gene_length=gene_dict.get('gene_length'))
obj_gene.full_clean()
obj_gene.save()
# Add link to KEGG
if many_to_many_elements.get('kegg_ko') != 'unknown':
link_to_function(obj_gene, many_to_many_elements)
def insert_gene_list(chunk_genes):
for i in chunk_genes:
for gene_line in chunk_genes:
try:
gene_dict = parse_gene(i)
insert_gene(gene_dict)
gene_dict = parse_gene(gene_line)
# insert_gene(gene_dict)
gene_insertion = GeneInsertion(gene_dict)
gene_insertion.upsert_to_db()
except ValidationError as e:
_LOGGER.warning(f"{e.__dict__} for gene_id: {gene_dict.get('gene_id')}. Insertion skipped.")
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment