Commit 1a863324 authored by Kenzo-Hugo Hillion's avatar Kenzo-Hugo Hillion
Browse files

add end to end test for sequences creation of genes

parent bc356009
Pipeline #29805 passed with stages
in 3 minutes and 29 seconds
import logging
import pyfastx
from slugify import slugify
from metagenedb.apps.catalog.models import Gene
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class ImportGeneSequences(object):
CATALOG = "CAT_NAME"
def __init__(self, sequence_file):
self.sequence_file = sequence_file
self._reset_counters()
def _reset_counters(self):
self.processed_genes = 0
self.updated_genes = 0
self.skipped_genes = 0
def update_sequences(self, sequences):
genes = Gene.objects.filter(gene_id__in=sequences.keys())
genes_retrieved = genes.count()
for gene in genes:
gene.sequence = sequences[gene.gene_id]
try:
Gene.objects.bulk_update(genes, ['sequence'])
self.updated_genes += genes_retrieved
self.skipped_genes += len(sequences) - genes_retrieved
except Exception:
logger.warning("Could not update genes... skipped.")
self.skipped_genes += len(sequences)
def load_all(self, test=False, chunk_size=10000, skip_n_sequences=0):
logger.info("Starting %s Gene sequences import (update) to DB", self.CATALOG)
if skip_n_sequences > 0:
logger.info("Skipping first %s sequences", skip_n_sequences)
current_sequences = {}
for name, seq in pyfastx.Fasta(self.sequence_file, build_index=False):
if self.processed_genes < skip_n_sequences:
self.processed_genes += 1
self.skipped_genes += 1
continue
current_sequences[slugify(name.split()[0])] = seq
self.processed_genes += 1
if self.processed_genes % chunk_size == 0:
self.update_sequences(current_sequences)
logger.info("%s Gene sequences processed so far...", self.processed_genes)
current_sequences = {}
if test is True:
break
if len(current_sequences) > 0:
self.update_sequences(current_sequences)
logger.info("[DONE] %s/%s Gene sequences updated.", self.updated_genes, self.processed_genes)
logger.info("[DONE] %s/%s Genes skipped.", self.skipped_genes, self.processed_genes)
import os
from rest_framework.test import APITestCase
from metagenedb.apps.catalog.models import Gene
from metagenedb.apps.catalog.management.commands.import_igc_sequences import ImportIGCGeneSequences
from metagenedb.apps.catalog.management.commands.commons.import_gene_sequences import ImportGeneSequences
from metagenedb.apps.catalog.factory import (
GeneFactory,
)
......@@ -14,7 +16,7 @@ class TestUpdateSequences(APITestCase):
cls.gene = GeneFactory()
def setUp(self):
self.import_igc_seq = ImportIGCGeneSequences("test") # we never make real reference to the sequence_file
self.import_igc_seq = ImportGeneSequences("test") # we never make real reference to the sequence_file
def test_update_sequence(self):
seq = "ACTG"
......@@ -24,3 +26,27 @@ class TestUpdateSequences(APITestCase):
self.assertFalse(Gene.objects.get(gene_id=self.gene.gene_id).sequence)
self.import_igc_seq.update_sequences(sequences)
self.assertEqual(Gene.objects.get(gene_id=self.gene.gene_id).sequence, seq)
class TestEndToEnd(APITestCase):
@classmethod
def setUpTestData(cls):
GeneFactory.create(gene_id="gene1")
GeneFactory.create(gene_id="gene2")
def test_end_to_end(self):
test_file = os.path.join(os.path.dirname(__file__), "./test_files/genes.fa")
loader = ImportGeneSequences(test_file)
expected_genes = {
'gene1': {
'sequence': 'ACGT'
},
'gene2': {
'sequence': 'ATCG'
},
}
loader.load_all()
created_genes = Gene.objects.all()
for created_gene in created_genes:
self.assertEqual(getattr(created_gene, 'sequence'), expected_genes[created_gene.gene_id]['sequence'])
import logging
import pyfastx
from django.core.management.base import BaseCommand
from slugify import slugify
from metagenedb.apps.catalog.models import Gene
from metagenedb.apps.catalog.management.commands.commons.import_gene_sequences import ImportGeneSequences
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class ImportIGCGeneSequences(object):
def __init__(self, sequence_file):
self.sequence_file = sequence_file
self._reset_counters()
def _reset_counters(self):
self.processed_genes = 0
self.updated_genes = 0
self.skipped_genes = 0
def update_sequences(self, sequences):
genes = Gene.objects.filter(gene_id__in=sequences.keys())
genes_retrieved = genes.count()
for gene in genes:
gene.sequence = sequences[gene.gene_id]
try:
Gene.objects.bulk_update(genes, ['sequence'])
self.updated_genes += genes_retrieved
self.skipped_genes += len(sequences) - genes_retrieved
except Exception:
logger.warning("Could not update genes... skipped.")
self.skipped_genes += len(sequences)
def load_all(self, test=False, chunk_size=10000, skip_n_sequences=0):
logger.info("Starting IGC Gene sequences import (update) to DB")
if skip_n_sequences > 0:
logger.info("Skipping first %s sequences", skip_n_sequences)
current_sequences = {}
for name, seq in pyfastx.Fasta(self.sequence_file, build_index=False):
if self.processed_genes < skip_n_sequences:
self.processed_genes += 1
self.skipped_genes += 1
continue
current_sequences[slugify(name.split()[0])] = seq
self.processed_genes += 1
if self.processed_genes % chunk_size == 0:
self.update_sequences(current_sequences)
logger.info("%s Gene sequences processed so far...", self.processed_genes)
current_sequences = {}
if test is True:
break
if len(current_sequences) > 0:
self.update_sequences(current_sequences)
logger.info("[DONE] %s/%s Gene sequences updated.", self.updated_genes, self.processed_genes)
logger.info("[DONE] %s/%s Genes skipped.", self.skipped_genes, self.processed_genes)
class ImportIGCGeneSequences(ImportGeneSequences):
CATALOG = "IGC"
class Command(BaseCommand):
......
import logging
from django.core.management.base import BaseCommand
from metagenedb.apps.catalog.management.commands.commons.import_gene_sequences import ImportGeneSequences
logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)
class ImportVirgoGeneSequences(ImportGeneSequences):
CATALOG = "Virgo"
class Command(BaseCommand):
help = 'Create or update all Virgo gene equences (from `NT.fasta` file).'
def add_arguments(self, parser):
parser.add_argument(
'fasta',
help='NT.fasta file from Virgo. Genes need to exist in DB for this script to work.'
)
parser.add_argument('--test', action='store_true', help='Run only on first 10000 sequences.')
parser.add_argument('--skip_n', type=int, default=0, help='Number of sequence to skip')
def set_logger_level(self, verbosity):
if verbosity > 2:
logger.setLevel(logging.DEBUG)
elif verbosity > 1:
logger.setLevel(logging.INFO)
def handle(self, *args, **options):
self.set_logger_level(int(options['verbosity']))
import_igc = ImportVirgoGeneSequences(options['fasta'])
import_igc.load_all(test=options['test'], skip_n_sequences=options['skip_n'])
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment