import_igc_data.py 4.89 KB
Newer Older
1
#!/usr/bin/env python
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
2
3
4
5
6
import argparse
import logging
import os
import sys
from itertools import islice
7
from requests.exceptions import HTTPError
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
8
9

import django
10
from rest_framework.exceptions import ValidationError
11
from slugify import slugify
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
12

13
from metagenedb.common.utils.api import MetageneDBCatalogGeneAPI
14
15
from metagenedb.common.utils.parsers import IGCLineParser

Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
16
17
18
19
# Before model import, we need to called django.setup() to Load apps
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "metagenedb.settings")
django.setup()

20
from metagenedb.apps.catalog.models import Gene, Function, Taxonomy  # noqa
21
from metagenedb.apps.catalog.serializers import GeneSerializer  # noqa
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
22
23
24
25

logging.basicConfig(level=logging.INFO)
_LOGGER = logging.getLogger(__name__)

26
27
PHYLUM_COL = 'taxo_phylum'
GENUS_COL = 'taxo_genus'
28
SELECTED_KEYS = ['gene_id', 'length', 'kegg_ko', PHYLUM_COL, GENUS_COL]
29

Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
30

31
def parse_gene(raw_line, selected_keys=SELECTED_KEYS):
32
    """
33
    Use IGCLineParser and return selected keys
34
    """
35
36
37
38
    gene_parser = IGCLineParser()
    all_dict = gene_parser.gene(raw_line)
    selected_dict = {k: v for k, v in all_dict.items() if k in selected_keys}
    return selected_dict
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
39
40


41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def select_taxonomy(gene_dict, unknown_val='unknown'):
    """
    Select the taxonomy to be assigned for the gene.
    genus has priority on phylum. If both unknow, remove the taxonomy key
    """
    phylum = gene_dict.pop(PHYLUM_COL)
    genus = gene_dict.pop(GENUS_COL)
    if genus != unknown_val:
        queryset = Taxonomy.objects.filter(name=genus, rank="genus")
        if queryset.count() > 1:
            _LOGGER.warning(f"More than 1 result found for genus {genus}. First result is kept.")
        gene_dict.update(
            {'taxonomy': queryset[0].tax_id}
        )
    elif phylum != unknown_val:
        queryset = Taxonomy.objects.filter(name=phylum, rank="phylum")
        if queryset.count() > 1:
            _LOGGER.warning(f"More than 1 result found for phylum {phylum}. First result is kept.")
        gene_dict.update(
            {'taxonomy': queryset[0].tax_id}
        )
    return gene_dict


65
66
class ImportIGCGenes(object):
    METAGENEDB_GENE_API = MetageneDBCatalogGeneAPI
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
67

68
69
70
71
72
73
74
    def __init__(self, annotation_file, url, skip_tax=False, skip_functions=False):
        self.annotation_file = annotation_file
        self.url = url
        self.metagenedb_gene_api = self.METAGENEDB_GENE_API(base_url=self.url)
        # Skip some insertion if specified in script options
        self.skip_tax = skip_tax
        self.skip_functions = skip_functions
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
75

76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
    def _clean_gene(self, gene_dict):
        gene_dict['gene_id'] = slugify(gene_dict['gene_id'])
        if self.skip_tax:
            gene_dict.pop('taxonomy')
        if self.skip_functions:
            gene_dict.pop('functions')
        return gene_dict

    def _upsert_gene(self, gene_dict):
        clean_gene_dict = self._clean_gene(gene_dict)
        try:
            gene_id = clean_gene_dict['gene_id']
            self.metagenedb_gene_api.get(gene_id)  # Try to get obj to check if it exists
            self.metagenedb_gene_api.put(gene_id, clean_gene_dict)
        except HTTPError:
            self.metagenedb_gene_api.post(clean_gene_dict)

    def _insert_gene_list(self, chunk_genes):
        for gene_line in chunk_genes:
            gene_dict = parse_gene(gene_line)
            gene_dict_with_taxo = select_taxonomy(gene_dict)
            try:
                self._upsert_gene(gene_dict_with_taxo)
            except ValidationError as e:
                _LOGGER.warning(f"{e.__dict__} for gene_id: {gene_dict.get('gene_id')}. Insertion skipped.")

    def load_annotation_file_to_db_in_chunks(self, chunk_size=100000):
        processed_genes = 0
        with open(self.annotation_file, 'r') as file:
            while True:
                chunk_genes = list(islice(file, chunk_size))
                if not chunk_genes:
                    break
                processed_genes += len(chunk_genes)
                self._insert_gene_list(chunk_genes)
                _LOGGER.info(f"{processed_genes} genes processed so far...")
        _LOGGER.info(f"[DONE] {processed_genes} genes processed.")
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
113
114
115
116
117
118
119
120
121


def parse_arguments():
    """
    Defines parser.
    """
    parser = argparse.ArgumentParser(description='Populate database from a given IGC annotation file.')
    # Common arguments for analysis and annotations
    parser.add_argument('annotation', help='IGC annotation file')
122
123
124
    parser.add_argument('url', help='base URL of the instance.', default='http://localhost/')
    parser.add_argument('--skip_taxonomy', action='store_true', help='Skip taxonomy information from genes.')
    parser.add_argument('--skip_functions', action='store_true', help='Skip functions information from genes.')
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
125
126
127
128
129
130
131
132
133

    try:
        return parser.parse_args()
    except SystemExit:
        sys.exit(1)


def run():
    args = parse_arguments()
134
    load_annotation_file_to_db_in_chunks(args.annotation, args.url)
Kenzo-Hugo Hillion's avatar
Kenzo-Hugo Hillion committed
135
136
137
138


if __name__ == "__main__":
    run()