Commit dae3e53a authored by Nicolas  MAILLET's avatar Nicolas MAILLET
Browse files

Add parallel execution

parent 2c070bd1
......@@ -25,6 +25,9 @@
"""Contains class and function needed to perform a digestion"""
import os
import random
import sys
from multiprocessing import Pool
from functools import partial
from rpg import core
from rpg import rule
from rpg import sequence
......@@ -68,10 +71,6 @@ class ResultOneDigestion:
return self.__dict__ == other.__dict__
return False
# Needed with __eq__ to make it hashable
def __hash__(self):
return hash(self.__dict__.values())
# Create a clean output according to fmt
def __format__(self, fmt):
ret = ""
......@@ -409,20 +408,56 @@ def concurrent_digest(seq, enz, aa_pka):
# it will be one result by enzyme
return [result]
def digest_from_input(input_data, input_type, enz, mode, aa_pka):
"""Digest all sequences of input data with
selected enzymes and mode.
def digest_part(offset_start, offset_end, file, enz, mode, aa_pka):
""" Main parallelized function that digest each sequence of a file
in an offset range.
:param offset_start: where to start taking sequences in the file
:param offset_end: where to stop taking sequences in the file
:param file: the filename of the file where to take sequences from
:param enz: enzymes to digest with
:param mode: digestion mode (concurrent / sequential)
:param aa_pka: pKa values (IPC / Stryer)
:type offset_start: int
:type offset_end: int
:type file: string
:type enz: list(:py:class:`~rpg.enzyme.Enzyme`)
:type mode: str
:type aa_pka: str
"""
# Resulting digestions of current offset range
results_digestion = []
try:
# Query each sequence, one by one, in the offset range
for header, seq in core.next_read(file, offset_start, offset_end):
# Construct the Sequence to digest (remove first char of header)
tmp_seq = sequence.Sequence(header[1:], sequence.check_sequence(seq))
# Digest it
results_digestion.append(digest_one_sequence(tmp_seq, enz, mode,
aa_pka))
except ValueError as exc:
raise exc
# Add the global result into the queue
return results_digestion
def digest_from_input(input_data, input_type, enz, mode, aa_pka, nb_proc=1):
"""Digest all sequences of input data according to selected enzymes
and mode. Can be done in parallel using nb_proc argument.
:param input_data: either a sequence or the path of a file of sequence (fasta/fastq)
:param input_data: either a sequence or the path of a file of sequence (fasta/fastq, gzipped or not)
:param input_type: either 'sequence' or 'file'
:param enz: enzymes to digest with
:param mode: digestion mode (concurrent / sequential)
:param aa_pka: pKa values (IPC / Stryer)
:param nb_proc: number of process to run in parallel
:type input_data: str
:type input_type: str
:type enz: list(:py:class:`~rpg.enzyme.Enzyme`)
:type mode: str
:type aa_pka: str
:type nb_proc: int (default: 1)
:return: result of digestions
:rtype: list(list(:py:class:`ResultOneDigestion`))
......@@ -431,59 +466,52 @@ def digest_from_input(input_data, input_type, enz, mode, aa_pka):
results_digestion = []
# Input is a file?
if input_type == "file":
with open(input_data) as in_file:
header_first_car = in_file.read(1)
in_file.seek(0)
# Fasta file, can be multi-line
if header_first_car == ">":
# First header
header = in_file.readline().strip()
# First line
tmp_line = in_file.readline().strip()
seq = ""
while tmp_line:
if not tmp_line.startswith(">"):
seq += tmp_line
tmp_line = in_file.readline().strip()
else:
# Create a Sequence
tmp_seq = sequence.Sequence(header[1:],
sequence.check_sequence(seq))
# Digest sequence
results_digestion.append(digest_one_sequence
(tmp_seq, enz, mode, aa_pka))
seq = ""
header = tmp_line
tmp_line = in_file.readline().strip()
# Last sequence to digest
tmp_seq = sequence.Sequence(header[1:],
sequence.check_sequence(seq))
# Digest it
results_digestion.append(digest_one_sequence(tmp_seq, enz,
mode, aa_pka))
# Fastq file
elif header_first_car == "@":
header = in_file.readline().strip()
while header:
seq = in_file.readline().strip()
tmp_seq = sequence.Sequence(header[1:],
sequence.check_sequence(seq))
# Digest sequence
results_digestion.append(digest_one_sequence(tmp_seq, enz,
mode, aa_pka))
in_file.readline()
in_file.readline()
header = in_file.readline().strip()
else:
core.handle_errors("input file format not recognized (%s)." %
header_first_car, 0, "Input ")
# Get the size of the file
total_size = os.path.getsize(input_data)
# Size of what to read
chunk_size = total_size // nb_proc
# Starting offset
offset_start = 0
try:
# Create the pool of process
pool = Pool()
# Partial function to fix all but firsts arguments
prod_digest=partial(digest_part, file=input_data, enz=enz, mode=mode,
aa_pka=aa_pka)
# All tuples of offset_start, offset_end
all_offsets = []
# For each thread/chunk
for _ in range(nb_proc - 1):
# Compute the ending offset for this chunk
offset_end = offset_start + chunk_size
# Add this couple of start/end
all_offsets.append((offset_start, offset_end))
# Next start is where it stops
offset_start = offset_start + chunk_size
# Add the last chunk
all_offsets.append((offset_start, total_size))
# Launch all process (Results is a list of list)
results = pool.starmap(prod_digest, all_offsets)
except ValueError as exc:
pool.terminate()
core.handle_errors(str(exc), 0, "Input ")
pool.terminate()
# Get a flatten list
for i in results:
results_digestion += i
# input is a single sequence
elif input_type == "sequence":
tmp_seq = sequence.Sequence("Input",
sequence.check_sequence(input_data))
# Digest the sequence
results_digestion.append(digest_one_sequence(tmp_seq, enz, mode,
aa_pka))
try:
tmp_seq = sequence.Sequence("Input",
sequence.check_sequence(input_data))
# Digest the sequence
results_digestion.append(digest_one_sequence(tmp_seq, enz, mode,
aa_pka))
except ValueError as exc:
core.handle_errors(str(exc), 0, "Input ")
# bad input
else:
core.handle_errors("input type not recognized (%s)." %
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment