create_update_eggnog.py 3.59 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import logging

from django.core.management.base import BaseCommand
from django.db import IntegrityError

from metagenedb.apps.catalog.models import EggNog, EggNogFunctionalCategory
from metagenedb.common.utils.chunks import file_len
from metagenedb.common.utils.parsers import EggNogAnnotationLineParser

logging.basicConfig(format='[%(asctime)s] %(levelname)s:%(name)s:%(message)s')
logger = logging.getLogger(__name__)


class ImportEggNog(object):

    def __init__(self, file_path):
        self.annotation_file = file_path
        self.eggnog_parser = EggNogAnnotationLineParser()
        self.processed_count = 0
        self.created_count = 0
        self.updated_count = 0
        self.skipped_count = 0
        self.skipped_ids = []

    def _build_functional_category_dict(self):
        all_categories = EggNogFunctionalCategory.objects.all()
27
28
        if not all_categories:
            raise Exception("You need to create Functional categories first.")
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
        self.functional_cat = {cat.category_id: cat for cat in all_categories}

    def link_functional_category(self, eggnog_dict):
        cat_key = eggnog_dict.get('functional_category', 'S')
        category = self.functional_cat.get(cat_key)
        eggnog_dict.update({'functional_category': category})

    def load_all(self, test=False):
        self._build_functional_category_dict()
        self.total_eggnog_nb = file_len(self.annotation_file)
        with open(self.annotation_file, "r") as file:
            for line in file:
                eggnog_dict = self.eggnog_parser.get_dict(line)
                self.link_functional_category(eggnog_dict)
                payload = {k: v for k, v in eggnog_dict.items() if v != ""}
                try:
                    eggnog = EggNog(**payload)
                    eggnog.save()
                    self.created_count += 1
                except IntegrityError:
                    try:
                        eggnog = EggNog.objects.get(function_id=payload.get('function_id'))
                        for k, v in payload.items():
                            setattr(eggnog, k, v)
                        eggnog.save()
                        self.updated_count += 1
                    except IntegrityError:
                        self.skipped_ids.append(payload.get('function_id'))
                        self.skipped_count += 1
                self.processed_count += 1
                if self.processed_count % 1000 == 0:
                    logger.info("%s/%s EggNog processed so far...", self.processed_count, self.total_eggnog_nb)
                    if test:
                        break
        logger.info("[DONE] %s/%s EggNog created.", self.created_count, self.total_eggnog_nb)
        logger.info("[DONE] %s/%s EggNog updated.", self.updated_count, self.total_eggnog_nb)
        logger.info("[DONE] %s/%s EggNog skipped. List: %s", self.skipped_count, self.total_eggnog_nb,
                    self.skipped_ids)


class Command(BaseCommand):
    help = 'Create or update all Eggnog entries from annotations.tsv file.'

    def add_arguments(self, parser):
        parser.add_argument('annotation', help='annotations.tsv file from EggNog')
        parser.add_argument('--test', action='store_true', help='Run only on first 1000 entries.')

    def set_logger_level(self, verbosity):
        if verbosity > 2:
            logger.setLevel(logging.DEBUG)
        elif verbosity > 1:
            logger.setLevel(logging.INFO)

    def handle(self, *args, **options):
        self.set_logger_level(int(options['verbosity']))
        import_eggnog = ImportEggNog(options['annotation'])
        import_eggnog.load_all(test=options['test'])