Commit 6f9f4bf8 authored by Bertrand  NÉRON's avatar Bertrand NÉRON
Browse files

refactor fasta_reader to put open file outside function

parent fb1db00b
......@@ -5,57 +5,58 @@ from itertools import groupby
Sequence = namedtuple("Sequence", "id comment sequence")
def fasta_iter(fasta_path):
"""
:param fasta_file: the file containing all input sequences in fasta format.
:type fasta_file: file object
:author: http://biostar.stackexchange.com/users/36/brentp
:return: for a given fasta file, it returns an iterator which yields tuples
def fasta_iter(fasta_file):
"""
:param fasta_file: the file containing all input sequences in fasta format.
:type fasta_file: file object
:author: http://biostar.stackexchange.com/users/36/brentp
:return: for a given fasta file, it returns an iterator which yields tuples
(string id, string comment, int sequence length)
:rtype: iterator
"""
with open(fasta_path) as fasta_file:
# ditch the boolean (x[0]) and just keep the header or sequence since
# we know they alternate.
group = (x[1] for x in groupby(fasta_file , lambda line: line[0] == ">"))
for header in group:
# drop the ">"
header = header.next()[1:].strip()
header = header.split()
_id = header[0]
comment = ' '.join(header[1:])
seq = ''.join(s.strip() for s in group.next())
yield Sequence(_id, comment, seq)
:rtype: iterator
"""
# ditch the boolean (x[0]) and just keep the header or sequence since
# we know they alternate.
group = (x[1] for x in groupby(fasta_file , lambda line: line[0] == ">"))
for header in group:
# drop the ">"
header = header.next()[1:].strip()
header = header.split()
_id = header[0]
comment = ' '.join(header[1:])
seq = ''.join(s.strip() for s in group.next())
yield Sequence(_id, comment, seq)
def fasta_writer(sequence, fasta_path):
def fasta_writer(sequence, output_file):
"""
write a sequence in a file in fasta format
:param sequence: the sequence to print
:type sequence: Sequence instance
:param fasta_path: the path to the file to print the sequence in
:type fasta_path: string
:param v: the file to print the sequence in
:type output_file: file object
"""
print "appel de fasta_writer ",sequence.id, " ",fasta_path
with open(fasta_path, 'w') as output:
output.write('>{0.id} {0.comment}\n'.format(seq))
start = 0
while start < len(seq.sequence):
end = start + 80
print start, " : ", end
output.write(seq.sequence[start: end + 1] + '\n')
start = end
output_file.write('>{0.id} {0.comment}\n'.format(seq))
start = 0
while start < len(seq.sequence):
end = start + 80
print start, " : ", end
output_file.write(seq.sequence[start: end + 1] + '\n')
start = end
if __name__ == '__main__':
if len(sys.argv) != 2:
sys.exit("usage: fasta_filter path/to/fasta/file/to/read")
input_path = sys.argv[1]
for seq in fasta_iter(input_path):
if seq.sequence.startswith('M') and seq.sequence.count('W') > 6:
if os.path.exists(seq.id):
print >> sys.stderr , "file {0} already exist: sequence {0} skipped".format(seq.id)
continue
else:
output_fasta = seq.id + ".fa"
fasta_writer(seq, output_fasta)
\ No newline at end of file
with open(input_path, 'r') as input_file:
for seq in fasta_iter(input_path):
if seq.sequence.startswith('M') and seq.sequence.count('W') > 6:
if os.path.exists(seq.id):
print >> sys.stderr , "file {0} already exist: sequence {0} skipped".format(seq.id)
continue
else:
output_fasta = seq.id + ".fa"
with open(output_path, 'w') as output_file:
fasta_writer(seq, output_file)
\ No newline at end of file
......@@ -3,7 +3,7 @@ from itertools import groupby
Sequence = namedtuple("Sequence", "id comment sequence")
def fasta_iter(fasta_path):
def fasta_iter(fasta_file):
"""
:param fasta_file: the file containing all input sequences in fasta format.
:type fasta_file: file object
......@@ -30,4 +30,9 @@ def fasta_iter(fasta_path):
#f.next()
#or
# for seq in fasta_iter('seq.fasta'):
# do something with seq
\ No newline at end of file
# do something with seq
#The problem with this implementation is
# something goes wrong in do something with seq
# but we don't quit the program (we catch the exception for instance)
# the fasta file is still open
# it's better to put the fasta file opening out the fasta reader see fasta filter
\ No newline at end of file
......@@ -21,4 +21,4 @@ def fasta_reader(fasta_path):
in_sequence = True
else:
sequence += line.strip()
return Sequence(id_ , comment, sequence)
\ No newline at end of file
return Sequence(id_ , comment, sequence)
\ No newline at end of file
......@@ -2,7 +2,7 @@ from collections import namedtuple
Sequence = namedtuple("Sequence", "id comment sequence")
def fasta_reader(fasta_path):
def fasta_reader(fasta_file):
"""
:param fasta_path: the path to the file to parse
:type fasta_path: string
......@@ -10,22 +10,24 @@ def fasta_reader(fasta_path):
:rtype: list of Sequence
"""
sequences = []
with open(fasta_path, 'r') as fasta_infile:
id_ = ''
comment = ''
sequence = ''
for line in fasta_infile:
if line.startswith('>'):
# a new sequence begin
if id_ != '':
# a sequence was already parsed so add it to the list
sequences.append(Sequence(id_ , comment, sequence))
sequence = ''
header = line.split()
id_ = header[0]
comment = ' '.join(header[1:])
else:
sequence += line.strip()
#append the last sequence of the file to the list
sequences.append(Sequence(id_ , comment, sequence))
return sequences
\ No newline at end of file
id_ = ''
comment = ''
sequence = ''
for line in fasta_infile:
if line.startswith('>'):
# a new sequence begin
if id_ != '':
# a sequence was already parsed so add it to the list
sequences.append(Sequence(id_ , comment, sequence))
sequence = ''
header = line.split()
id_ = header[0]
comment = ' '.join(header[1:])
else:
sequence += line.strip()
return Sequence(id_ , comment, sequence)
# if we open the file in the fasta reader we are forced
# to read all the sequences and charge them in memory which can take huge space
# it's better to read sequences one by one and treat it as one is ready.
# see fasta_filter.py
\ No newline at end of file
from matrix import *
import matrix
lines = iter( [' A G C U\n',
'A 1.0 0.5 0.0 0.0\n',
......@@ -11,14 +11,14 @@ def parse_similarity_file():
"""
parse file containing RNA similarity matrix and return a matrix
"""
sim_matrix = matrix_maker(4, 4)
sim_matrix = matrix.create(4, 4)
#skip first line
lines.next()
for row_no, line in enumerate(lines):
line = line.strip()
fields = line.split()
values = [float(val) for val in fields[1:]]
matrix_replace_row(sim_matrix, row_no, values)
matrix.replace_row(sim_matrix, row_no, values)
return sim_matrix
def get_similarity(b1, b2, sim_matrix):
......@@ -39,7 +39,7 @@ def get_similarity(b1, b2, sim_matrix):
raise KeyError("unknown base b1: " + str(b1))
if not b2 in bases:
raise KeyError("unknown base b2: " + str(b2))
return matrix_get_cell(sim_matrix, bases[b1], bases[b2])
return matrix.get_cell(sim_matrix, bases[b1], bases[b2])
def compute_similarity(seq1, seq2, sim_matrix):
"""
......@@ -63,7 +63,7 @@ if __name__ == '__main__':
seq1 = 'AGCAUCUA'
seq2 = 'ACCGUUCU'
sim_matrix = parse_similarity_file()
print matrix_to_str(sim_matrix)
print matrix.to_str(sim_matrix)
similarity = compute_similarity(seq1, seq2, sim_matrix)
print similarity
\ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment