diff --git a/bin/split_merged.py b/bin/split_merged.py index 39b18dd262bd12f47ddbb99ee399f9950c544700..1d99ae47ee88f7ac8f16a0b6b1591dd570c993e8 100755 --- a/bin/split_merged.py +++ b/bin/split_merged.py @@ -56,7 +56,11 @@ def main(): logging.info("Trying to use %s processors (mode %s).", nprocs, mode) pool = mp.Pool(processes=nprocs) computations = [ - pool.apply_async(split_fasta, args=(in_fname, out_dir)) + pool.apply_async( + split_fasta, args=( + in_fname, + cell_id := in_fname.name[:-len("_merged.fasta")], + out_dir.join_path(cell_id))) for in_fname in in_fnames] report_list = [comp.get() for comp in computations] elif mode == "dask": @@ -64,13 +68,15 @@ def main(): dask.config.set(scheduler="processes") computations = [] for in_fname in in_fnames: + cell_id = in_fname.name[:-len("_merged.fasta")] computations.append( - dask.delayed(split_fasta)(in_fname, out_dir)) + dask.delayed(split_fasta)(in_fname, cell_id, out_dir)) [report_list] = dask.compute([*computations], num_workers=nprocs) else: report_list = [] for in_fname in in_fnames: - report_list.append(split_fasta(in_fname, out_dir)) + cell_id = in_fname.name[:-len("_merged.fasta")] + report_list.append(split_fasta(in_fname, cell_id, out_dir)) logging.info("Finished computations") with open(out_dir.joinpath("sequence_numbers.tsv"), "w") as fh: for reports in report_list: diff --git a/libclusterseq/__init__.py b/libclusterseq/__init__.py index 141ff871a548eafbe9a4f585c5817d75bcf10304..3add4d50a8ccdeb0f8759b9eeae536c058ab4676 100644 --- a/libclusterseq/__init__.py +++ b/libclusterseq/__init__.py @@ -1,6 +1,9 @@ __copyright__ = "Copyright (C) 2020 Blaise Li" __licence__ = "GNU GPLv3" -__version__ = 0.1 +__version__ = 0.2 from .libclusterseq import ( + nx_version, + parasail_version, + sp_version, split_fasta, ) diff --git a/libclusterseq/libclusterseq.py b/libclusterseq/libclusterseq.py index 99137945ffd33c15c7b1332f48bffe16e312a1ae..f0b1a69f69fc362c61fd195ed30e1d961a500085 100644 --- a/libclusterseq/libclusterseq.py +++ b/libclusterseq/libclusterseq.py @@ -17,25 +17,25 @@ This library provides utilities to partition reads from a fastq file into several files containing subgroups of those reads based on similarity. """ -import sys # import warnings import logging from collections import defaultdict from itertools import chain from operator import itemgetter -from pathlib import Path -import multiprocessing as mp +# from pathlib import Path from matplotlib import pyplot as plt import seaborn as sns from numpy import fromiter, linspace from scipy.stats import gaussian_kde +from scipy import __version__ as sp_version from networkx import ( Graph, complement, connected_components, set_node_attributes) -import dask +from networkx import __version__ as nx_version # This may cause issues in some parallel settings # from mappy import fastx_read # from .libbamutils import ali2fq, bam2fastq import parasail +from parasail import __version__ as parasail_version # To silence annoying matlotlib logs: @@ -285,21 +285,19 @@ def write_fasta(records, fname): fasta_out.write(f">{name}\n{seq}\n") -def split_fasta(in_fname, out_dir): +def split_fasta(in_fname, cell_id, cell_dir): """ Split fasta file *in_fname* based on nucleotide sequence similarities. - The resulting fasta files are written in a subdirectory <cell_id> of - *out_dir* where <cell_id> is extracted from *in_fname*, assuming that its - base name follows the "<cell_id>_merged.fasta" pattern. + The resulting fasta files are written in directory *cell_dir* Return a list of character strings, each one representing a tab-separated line where the first column is the name of a written fasta file and the second column the corresponding number of sequences. """ - cell_id = in_fname.name[:-len("_merged.fasta")] + # cell_id = in_fname.name[:-len("_merged.fasta")] logging.debug("%s", cell_id) - cell_dir = out_dir.joinpath(cell_id) + # cell_dir = out_dir.joinpath(cell_id) logging.info("split_fasta: Creating %s", cell_dir) cell_dir.mkdir(parents=True, exist_ok=True) split_graph = make_sequence_similarity_groups(in_fname, cell_dir) @@ -344,55 +342,3 @@ def split_fasta(in_fname, out_dir): fh.write("".join(report_info)) # fh.write(date.today().strftime("%Y-%m-%d-%H:%M:%S\n")) return report_info - - -def main(): - """Main function of the program.""" - logging.basicConfig(level=logging.INFO) - # logging.basicConfig(level=logging.DEBUG) - # https://github.com/jeffdaily/parasail/blob/master/parasail/matrices/ - # nuc44.h - # matrix = parasail.nuc44 - out_dir = Path(sys.argv[2]) - logging.debug("Creating %s", out_dir) - out_dir.mkdir(parents=True, exist_ok=True) - in_dir = Path(sys.argv[1]) - in_fnames = in_dir.glob("*_merged.fasta") - nprocs = int(sys.argv[3]) - 1 - # try: - # nprocs = int(environ["SLURM_NTASKS_PER_NODE"]) - 1 - # except KeyError: - # nprocs = 1 - # mode = "for loop" - # mode = "mp" - mode = "dask" - if mode == "mp": - logging.info("Trying to use %s processors (mode %s).", nprocs, mode) - pool = mp.Pool(processes=nprocs) - computations = [ - pool.apply_async(split_fasta, args=(in_fname, out_dir)) - for in_fname in in_fnames] - report_list = [comp.get() for comp in computations] - elif mode == "dask": - logging.info("Trying to use %s processors (mode %s).", nprocs, mode) - dask.config.set(scheduler="processes") - computations = [] - for in_fname in in_fnames: - computations.append( - dask.delayed(split_fasta)(in_fname, out_dir)) - [report_list] = dask.compute([*computations], num_workers=nprocs) - else: - report_list = [] - for in_fname in in_fnames: - report_list.append(split_fasta(in_fname, out_dir)) - logging.info("Finished computations") - with open(out_dir.joinpath("sequence_numbers.tsv"), "w") as fh: - for reports in report_list: - fh.write("".join(reports)) - logging.info("Finished writing reports") - return 0 - - -if __name__ == "__main__": - sys.exit(main()) -