Select Git revision
ali2consensus.py
ali2consensus.py 4.11 KiB
#!/usr/bin/env python3
# Copyright (C) 2020 Blaise Li
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
This program reads alignment fasta files and generates
consensus sequences for each one of them.
"""
import sys
# from os import environ
import logging
# from datetime import date
from pathlib import Path
import multiprocessing as mp
import dask
from libconsensus import fasta2cons
def make_cons_dir(align_dir, out_dir):
"""
Create a sub-directory of *out_dir* to store
the files with added consensuses corresponding
to the alignments stored in *align_dir*.
"""
cell_id = align_dir.parts[-2]
cons_dir = out_dir.joinpath(cell_id)
cons_dir.mkdir(parents=True, exist_ok=True)
return str(cons_dir)
def main():
"""Main function of the program."""
logging.basicConfig(level=logging.INFO)
# logging.basicConfig(level=logging.DEBUG)
out_dir = Path(sys.argv[2])
logging.debug("Creating %s", out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
in_dir = Path(sys.argv[1])
aligner = "clustalo"
in_fnames = list(in_dir.glob(f"*/*_aligned_{aligner}.fasta"))
cell_ids = [
in_fname.parent.name
for in_fname in in_fnames]
cons_fnames = [
out_dir.joinpath(cell_id, f"{in_fname.stem}_consensus.fasta")
for (in_fname, cell_id) in zip(in_fnames, cell_ids)]
with_cons_fnames = [
out_dir.joinpath(cell_id, f"{in_fname.stem}_with_consensus.fasta")
for (in_fname, cell_id) in zip(in_fnames, cell_ids)]
logging.debug(
"Found the following alignments:\b%s",
"\n".join(map(str, in_fnames)))
# TODO filter this to only include those that contain fasta files
# (There's a spurious "aligned" empty directory created.)
align_dirs = sorted(set((fname.parent for fname in in_fnames)))
nprocs = int(sys.argv[3]) - 1
if nprocs == 0:
mode = "for loop"
else:
# mode = "mp"
mode = "dask"
# TODO: adapt to the new fasta2cons interface
if mode == "mp":
logging.info("Trying to use %s processors (mode %s).", nprocs, mode)
pool = mp.Pool(processes=nprocs)
computations = [
pool.apply_async(make_cons_dir, args=(align_dir, out_dir))
for align_dir in align_dirs]
dir_names = [comp.get() for comp in computations]
computations = [
pool.apply_async(fasta2cons, args=fnames)
for fnames in zip(in_fnames, cons_fnames, with_cons_fnames)]
out_fnames = [comp.get() for comp in computations]
elif mode == "dask":
logging.info("Trying to use %s processors (mode %s).", nprocs, mode)
dask.config.set(scheduler="processes")
computations = []
for align_dir in align_dirs:
computations.append(
dask.delayed(make_cons_dir)(align_dir, out_dir))
[dir_names] = dask.compute([*computations], num_workers=nprocs)
computations = []
for fnames in zip(in_fnames, cons_fnames, with_cons_fnames):
computations.append(
dask.delayed(fasta2cons)(*fnames))
[out_fnames] = dask.compute([*computations], num_workers=nprocs)
else:
dir_names = []
for align_dir in align_dirs:
dir_names.append(make_cons_dir(align_dir, out_dir))
out_fnames = []
for fnames in zip(in_fnames, cons_fnames, with_cons_fnames):
out_fnames.append(fasta2cons(*fnames))
logging.info("Finished consensus computation.")
logging.debug("\n".join(out_fnames))
return 0
if __name__ == "__main__":
sys.exit(main())