Skip to content
Snippets Groups Projects
Select Git revision
  • 5ebb3bd07768444e061c38a3acd7fe6c4cca38fc
  • master default protected
2 results

ali2consensus.py

Blame
  • ali2consensus.py 4.11 KiB
    #!/usr/bin/env python3
    # Copyright (C) 2020 Blaise Li
    #
    # This program is free software: you can redistribute it and/or modify
    # it under the terms of the GNU General Public License as published by
    # the Free Software Foundation, either version 3 of the License, or
    # (at your option) any later version.
    #
    # This program is distributed in the hope that it will be useful,
    # but WITHOUT ANY WARRANTY; without even the implied warranty of
    # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    # GNU General Public License for more details.
    #
    # You should have received a copy of the GNU General Public License
    # along with this program.  If not, see <https://www.gnu.org/licenses/>.
    """
    This program reads alignment fasta files and generates
    consensus sequences for each one of them.
    """
    import sys
    # from os import environ
    import logging
    # from datetime import date
    from pathlib import Path
    import multiprocessing as mp
    import dask
    from libconsensus import fasta2cons
    
    
    def make_cons_dir(align_dir, out_dir):
        """
        Create a sub-directory of *out_dir* to store
        the files with added consensuses corresponding
        to the alignments stored in *align_dir*.
        """
        cell_id = align_dir.parts[-2]
        cons_dir = out_dir.joinpath(cell_id)
        cons_dir.mkdir(parents=True, exist_ok=True)
        return str(cons_dir)
    
    
    def main():
        """Main function of the program."""
        logging.basicConfig(level=logging.INFO)
        # logging.basicConfig(level=logging.DEBUG)
        out_dir = Path(sys.argv[2])
        logging.debug("Creating %s", out_dir)
        out_dir.mkdir(parents=True, exist_ok=True)
        in_dir = Path(sys.argv[1])
        aligner = "clustalo"
        in_fnames = list(in_dir.glob(f"*/*_aligned_{aligner}.fasta"))
        cell_ids = [
            in_fname.parent.name
            for in_fname in in_fnames]
        cons_fnames = [
            out_dir.joinpath(cell_id, f"{in_fname.stem}_consensus.fasta")
            for (in_fname, cell_id) in zip(in_fnames, cell_ids)]
        with_cons_fnames = [
            out_dir.joinpath(cell_id, f"{in_fname.stem}_with_consensus.fasta")
            for (in_fname, cell_id) in zip(in_fnames, cell_ids)]
        logging.debug(
            "Found the following alignments:\b%s",
            "\n".join(map(str, in_fnames)))
        # TODO filter this to only include those that contain fasta files
        # (There's a spurious "aligned" empty directory created.)
        align_dirs = sorted(set((fname.parent for fname in in_fnames)))
        nprocs = int(sys.argv[3]) - 1
        if nprocs == 0:
            mode = "for loop"
        else:
            # mode = "mp"
            mode = "dask"
        # TODO: adapt to the new fasta2cons interface
        if mode == "mp":
            logging.info("Trying to use %s processors (mode %s).", nprocs, mode)
            pool = mp.Pool(processes=nprocs)
            computations = [
                pool.apply_async(make_cons_dir, args=(align_dir, out_dir))
                for align_dir in align_dirs]
            dir_names = [comp.get() for comp in computations]
            computations = [
                pool.apply_async(fasta2cons, args=fnames)
                for fnames in zip(in_fnames, cons_fnames, with_cons_fnames)]
            out_fnames = [comp.get() for comp in computations]
        elif mode == "dask":
            logging.info("Trying to use %s processors (mode %s).", nprocs, mode)
            dask.config.set(scheduler="processes")
            computations = []
            for align_dir in align_dirs:
                computations.append(
                    dask.delayed(make_cons_dir)(align_dir, out_dir))
            [dir_names] = dask.compute([*computations], num_workers=nprocs)
            computations = []
            for fnames in zip(in_fnames, cons_fnames, with_cons_fnames):
                computations.append(
                    dask.delayed(fasta2cons)(*fnames))
            [out_fnames] = dask.compute([*computations], num_workers=nprocs)
        else:
            dir_names = []
            for align_dir in align_dirs:
                dir_names.append(make_cons_dir(align_dir, out_dir))
            out_fnames = []
            for fnames in zip(in_fnames, cons_fnames, with_cons_fnames):
                out_fnames.append(fasta2cons(*fnames))
        logging.info("Finished consensus computation.")
        logging.debug("\n".join(out_fnames))
        return 0
    
    
    if __name__ == "__main__":
        sys.exit(main())