Select Git revision
sequence.py
parser.py 11.38 KiB
#! /usr/bin/env python
#-*- coding: utf-8 -*-
"""
Created on 27 dec. 2011
@author: Bertrand Néron
"""
from __future__ import print_function
from collections import namedtuple
from couchdbkit.client import Server
from couchdbkit.exceptions import ResourceNotFound
from couchdbkit.resource import CouchdbResource
from couchdbkit.schema import Document
from couchdbkit.schema.properties import *
from restkit import Resource, BasicAuth
import restkit.errors
def replicon_parser(replicon_data):
"""
parse a file containing the information about replicons
:param replicon_data: the path of replicon information file
:type replicon_data: string
:return: a dict containing Replicon_info as values and Replicon name (field 0) as key
:rtype: dict
"""
replicon_db = {}
Replicon_info = namedtuple('Replicon_info', ('name', 'ncbi_id', 'taxid', 'strain', 'taxonomy', 'type'))
with open(replicon_data, 'r') as replicon_file:
line_nb = 0
for line in replicon_file:
line_nb += 1
if not line.startswith('#'):
line = line.strip()
fields = line.split('\t')
if fields[0] in replicon_db:
raise KeyError("duplicate replicon:" + fields[0])
else:
try:
replicon_id = fields[0]
ncbi_id = fields[1]
taxid = int(fields[2])
strain = fields[3]
taxonomy = fields[4].split('; ')
# remove ending dot or semi-colon from the last term of taxonnomy
if taxonomy[-1].endswith('.') or taxonomy[-1].endswith(';'):
taxonomy = taxonomy[-1][:-1]
replicon_type = fields[5]
replicon_db[replicon_id] = Replicon_info(replicon_id, ncbi_id, taxid, strain,
taxonomy, replicon_type)
except Exception as err:
raise Exception("Error during parsing line {0}: {1} : {2}".format(line_nb, line, err))
return replicon_db
def system_parser(system_data):
"""
:param system_data: the path of secretion system information file
:type system_data: string
:return: a mapping wit system-code as keys and
:rtype: dict
"""
system_db = {}
System_info = namedtuple('System_info', 'code, predicted_system, system_status, replicon, genes')
Gene = namedtuple('Gene',
('code', 'id', 'protein_length', 'strand', 'begin', 'end', 'match',
'score', 'i_evalue', 'coverage', 'match_begin', 'match_end', 'name', 'description')
)
with open(system_data, 'r') as system_file:
line_nb = 0
for line in system_file:
line_nb += 1
if line[0] != '#':
line = line.strip()
fields = line.split('\t')
gene_code = fields[0]
if gene_code in system_db:
raise KeyError("duplicate replicon:" + fields[0])
try:
gene_id = fields[1]
protein_length = int(fields[2])
strand = fields[3] if fields[3] != '-' else None
begin = int(fields[4]) if fields[4] != '-' else None
end = int(fields[5]) if fields[5] != '-' else None
match = fields[6] if fields[6] != '-' else None
# in old data, float number use , instead of dot
score = float(fields[7].replace(',', '.')) if fields[7] != '-' else None
i_evalue = float(fields[8].replace(',', '.')) if fields[8] != '-' else None
coverage = float(fields[9].replace(',', '.')) if fields[9] != '-' else None
match_begin = int(fields[10]) if fields[10] != '-' else None
match_end = int(fields[11]) if fields[11] != '-' else None
replicon_id = fields[12]
predicted_system = fields[13] if fields[13] != '-' else None
system_id = fields[14]
if system_id == '-':
raise RuntimeError("System-Id is empty")
system_status = fields[15] if fields[15] != '-' else None
gene_name = fields[16] if fields[16] else None
description = fields[17] if fields[17] else None
except Exception as err:
raise RuntimeError("Error during parsing line {0}: {1} : {2}".format(line_nb, line, err))
gene = Gene(gene_code,
gene_id,
protein_length,
strand,
begin,
end,
match,
score,
i_evalue,
coverage,
match_begin,
match_end,
gene_name,
description
)
if system_id in system_db:
if gene.code in system_db[system_id].genes:
raise KeyError("duplicate gene: replicon= {0}; gene= {1};".format(replicon_id, gene.code))
else:
# append this gene to System_info genes
system_db[system_id].genes[gene.code] = gene
else:
# create a new System_info entry
system_db[system_id] = System_info(system_id, predicted_system, system_status,
replicon_id, genes={gene.code: gene})
return system_db
class SecretionSystem(Document):
"""
a representation of a secretion System to be use with couchdb
"""
code = StringProperty(required=True)
predicted_system = StringProperty()
replicon = DictProperty()
genes = ListProperty()
def fill_db(server_uri, db_name, user, passwd, replicon_db, system_db, force_update=False):
"""
:param server_uri: the url of the couchdb server (with port)
:type server_uri: string
:param db_name: the name of the db in the couchdb server
:type db_name: string
:param user: a login representing a user who is granted to modify the DB
:type user: string
:param passwd: the password that allow to authenticate the user
:type passwd: string
:param replicon_db: the set of replicons info as return by replicon_parser
:type replicon_db: dict
:param system_db: the set of secretion systems info as return by system_parser
:type system_db: dict
:param force_update: if true force the entry to be updated even if the _rev number is not provided
:type force_update: boolean
"""
auth = BasicAuth(user, passwd)
resource = CouchdbResource(server_uri, filters=[auth])
server = Server(resource_instance=resource)
secreton_db = server.get_or_create_db(db_name)
system_codes = system_db.keys()
system_codes.sort()
for syst_code in system_codes:
system = system_db[syst_code]
replicon = replicon_db[system.replicon]
secretion_system = SecretionSystem()
secretion_system._id = system.code
secretion_system.code = system.code
secretion_system.predicted_system = system.predicted_system
secretion_system.replicon = {'name': replicon.name,
'ncbi_id': replicon.ncbi_id,
'taxid': replicon.taxid,
'strain': replicon.strain,
'taxonomy': replicon.taxonomy,
'type': replicon.type
}
genes_code = system.genes.keys()
genes_code.sort()
genes = []
for gene_code in genes_code:
gene = system.genes[gene_code]
g = {}
for field in gene._fields:
value = getattr(gene, field)
if value is not None:
g[field] = value
genes.append(g)
secretion_system.genes = genes
secreton_db.save_doc(secretion_system, force_update=force_update)
if __name__ == '__main__':
import argparse
import sys
import getpass
def get_credentials():
user = raw_input('login: ')
password = getpass.getpass('password: ')
return user, password
usage = """
%(prog)s [options]
parse a file containing replicon informations and a file containing system informations
and fill a couchDB data base with these informations
"""
parser = argparse.ArgumentParser(usage=usage)
server_opt = parser.add_argument_group(title="Server Options")
server_opt.add_argument("-S", "--server",
action="store",
dest="server_url",
help="the url of the couchDB server (with the port)")
server_opt.add_argument("-d", "--database",
action="store",
dest="db_name",
help="the name of the data base")
parsing_opt = parser.add_argument_group(title="Parsing Options")
parsing_opt.add_argument("-r", "--replicon",
action="store",
dest="replicon_path",
help="the path to the replicon file to parse")
parsing_opt.add_argument("-s", "--system",
action="store",
dest="system_path",
help="the path to the system secretion file to parse")
parsing_opt.add_argument("-f", "--force_update",
action="store_true",
dest="force_update",
default=False,
help="insert document even if there is already a document with the same id (replace it)")
args = parser.parse_args()
if not args.server_url:
print("You must specify a server url", file=sys.stderr)
parser.print_help(sys.stderr)
sys.exit(1)
if not args.db_name:
print("You must specify a data base name", file=sys.stderr)
parser.print_help(sys.stderr)
sys.exit(1)
if not args.replicon_path:
print("You must specify the path to the replicon information file", file=sys.stderr)
parser.print_help(sys.stderr)
sys.exit(1)
if not args.system_path:
print("You must specify the path to the secretion system information file", file=sys.stderr)
parser.print_help(sys.stderr)
sys.exit(1)
replicon_db = replicon_parser(args.replicon_path)
system_db = system_parser(args.system_path)
try_again = 0
while True:
user, password = get_credentials()
try:
fill_db(args.server_url, args.db_name, user, password,
replicon_db, system_db, force_update=args.force_update)
break
except restkit.errors.Unauthorized as err:
try_again += 1
if try_again > 2:
sys.exit("Authentication failure")
except Exception as err:
sys.exit(2)