diff --git a/rpg/digest.py b/rpg/digest.py index 586e43c96595360ecb48f9cebb9718c15f058a53..090b65549630f6305e48b86092e37f9878c3380c 100644 --- a/rpg/digest.py +++ b/rpg/digest.py @@ -25,6 +25,9 @@ """Contains class and function needed to perform a digestion""" import os import random +import sys +from multiprocessing import Pool +from functools import partial from rpg import core from rpg import rule from rpg import sequence @@ -68,10 +71,6 @@ class ResultOneDigestion: return self.__dict__ == other.__dict__ return False - # Needed with __eq__ to make it hashable - def __hash__(self): - return hash(self.__dict__.values()) - # Create a clean output according to fmt def __format__(self, fmt): ret = "" @@ -409,20 +408,56 @@ def concurrent_digest(seq, enz, aa_pka): # it will be one result by enzyme return [result] -def digest_from_input(input_data, input_type, enz, mode, aa_pka): - """Digest all sequences of input data with - selected enzymes and mode. +def digest_part(offset_start, offset_end, file, enz, mode, aa_pka): + """ Main parallelized function that digest each sequence of a file + in an offset range. + + :param offset_start: where to start taking sequences in the file + :param offset_end: where to stop taking sequences in the file + :param file: the filename of the file where to take sequences from + :param enz: enzymes to digest with + :param mode: digestion mode (concurrent / sequential) + :param aa_pka: pKa values (IPC / Stryer) + :type offset_start: int + :type offset_end: int + :type file: string + :type enz: list(:py:class:`~rpg.enzyme.Enzyme`) + :type mode: str + :type aa_pka: str + """ + # Resulting digestions of current offset range + results_digestion = [] + + try: + # Query each sequence, one by one, in the offset range + for header, seq in core.next_read(file, offset_start, offset_end): + # Construct the Sequence to digest (remove first char of header) + tmp_seq = sequence.Sequence(header[1:], sequence.check_sequence(seq)) + # Digest it + results_digestion.append(digest_one_sequence(tmp_seq, enz, mode, + aa_pka)) + except ValueError as exc: + raise exc + + # Add the global result into the queue + return results_digestion + +def digest_from_input(input_data, input_type, enz, mode, aa_pka, nb_proc=1): + """Digest all sequences of input data according to selected enzymes + and mode. Can be done in parallel using nb_proc argument. - :param input_data: either a sequence or the path of a file of sequence (fasta/fastq) + :param input_data: either a sequence or the path of a file of sequence (fasta/fastq, gzipped or not) :param input_type: either 'sequence' or 'file' :param enz: enzymes to digest with :param mode: digestion mode (concurrent / sequential) :param aa_pka: pKa values (IPC / Stryer) + :param nb_proc: number of process to run in parallel :type input_data: str :type input_type: str :type enz: list(:py:class:`~rpg.enzyme.Enzyme`) :type mode: str :type aa_pka: str + :type nb_proc: int (default: 1) :return: result of digestions :rtype: list(list(:py:class:`ResultOneDigestion`)) @@ -431,59 +466,52 @@ def digest_from_input(input_data, input_type, enz, mode, aa_pka): results_digestion = [] # Input is a file? if input_type == "file": - with open(input_data) as in_file: - header_first_car = in_file.read(1) - in_file.seek(0) - # Fasta file, can be multi-line - if header_first_car == ">": - # First header - header = in_file.readline().strip() - # First line - tmp_line = in_file.readline().strip() - seq = "" - while tmp_line: - if not tmp_line.startswith(">"): - seq += tmp_line - tmp_line = in_file.readline().strip() - else: - # Create a Sequence - tmp_seq = sequence.Sequence(header[1:], - sequence.check_sequence(seq)) - # Digest sequence - results_digestion.append(digest_one_sequence - (tmp_seq, enz, mode, aa_pka)) - seq = "" - header = tmp_line - tmp_line = in_file.readline().strip() - # Last sequence to digest - tmp_seq = sequence.Sequence(header[1:], - sequence.check_sequence(seq)) - # Digest it - results_digestion.append(digest_one_sequence(tmp_seq, enz, - mode, aa_pka)) - # Fastq file - elif header_first_car == "@": - header = in_file.readline().strip() - while header: - seq = in_file.readline().strip() - tmp_seq = sequence.Sequence(header[1:], - sequence.check_sequence(seq)) - # Digest sequence - results_digestion.append(digest_one_sequence(tmp_seq, enz, - mode, aa_pka)) - in_file.readline() - in_file.readline() - header = in_file.readline().strip() - else: - core.handle_errors("input file format not recognized (%s)." % - header_first_car, 0, "Input ") + # Get the size of the file + total_size = os.path.getsize(input_data) + # Size of what to read + chunk_size = total_size // nb_proc + # Starting offset + offset_start = 0 + try: + # Create the pool of process + pool = Pool() + # Partial function to fix all but firsts arguments + prod_digest=partial(digest_part, file=input_data, enz=enz, mode=mode, + aa_pka=aa_pka) + # All tuples of offset_start, offset_end + all_offsets = [] + # For each thread/chunk + for _ in range(nb_proc - 1): + # Compute the ending offset for this chunk + offset_end = offset_start + chunk_size + # Add this couple of start/end + all_offsets.append((offset_start, offset_end)) + # Next start is where it stops + offset_start = offset_start + chunk_size + # Add the last chunk + all_offsets.append((offset_start, total_size)) + + # Launch all process (Results is a list of list) + results = pool.starmap(prod_digest, all_offsets) + except ValueError as exc: + pool.terminate() + core.handle_errors(str(exc), 0, "Input ") + pool.terminate() + + # Get a flatten list + for i in results: + results_digestion += i + # input is a single sequence elif input_type == "sequence": - tmp_seq = sequence.Sequence("Input", - sequence.check_sequence(input_data)) - # Digest the sequence - results_digestion.append(digest_one_sequence(tmp_seq, enz, mode, - aa_pka)) + try: + tmp_seq = sequence.Sequence("Input", + sequence.check_sequence(input_data)) + # Digest the sequence + results_digestion.append(digest_one_sequence(tmp_seq, enz, mode, + aa_pka)) + except ValueError as exc: + core.handle_errors(str(exc), 0, "Input ") # bad input else: core.handle_errors("input type not recognized (%s)." %