Skip to content
Snippets Groups Projects
Commit 5ebb3bd0 authored by Blaise Li's avatar Blaise Li
Browse files

Functions to compute alignment consensuses.

parents
Branches
Tags
No related merge requests found
# Compiled python modules.
*.pyc
# Setuptools distribution folder.
/dist/
# Python egg metadata, regenerated from source files by setuptools.
/*.egg-info
# Backups
*~
This diff is collapsed.
# Utilities to make a consensus from a sequence alignment.
This is a very preliminary version, with very few utilities.
## Installing
Get the source using `git clone git@gitlab.pasteur.fr:bli/libconsensus.git`, `cd` into it and run `python3 -m pip install .`
It might also work directly:
python3 -m pip install git+ssh://git@gitlab.pasteur.fr/bli/libconsensus.git
#!/usr/bin/env python3
# Copyright (C) 2020 Blaise Li
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
This program reads alignment fasta files and generates
consensus sequences for each one of them.
"""
import sys
# from os import environ
import logging
# from datetime import date
from pathlib import Path
import multiprocessing as mp
import dask
from libconsensus import fasta2cons
def make_cons_dir(align_dir, out_dir):
"""
Create a sub-directory of *out_dir* to store
the files with added consensuses corresponding
to the alignments stored in *align_dir*.
"""
cell_id = align_dir.parts[-2]
cons_dir = out_dir.joinpath(cell_id)
cons_dir.mkdir(parents=True, exist_ok=True)
return str(cons_dir)
def main():
"""Main function of the program."""
logging.basicConfig(level=logging.INFO)
# logging.basicConfig(level=logging.DEBUG)
out_dir = Path(sys.argv[2])
logging.debug("Creating %s", out_dir)
out_dir.mkdir(parents=True, exist_ok=True)
in_dir = Path(sys.argv[1])
aligner = "clustalo"
in_fnames = list(in_dir.glob(f"*/*_aligned_{aligner}.fasta"))
cell_ids = [
in_fname.parent.name
for in_fname in in_fnames]
cons_fnames = [
out_dir.joinpath(cell_id, f"{in_fname.stem}_consensus.fasta")
for (in_fname, cell_id) in zip(in_fnames, cell_ids)]
with_cons_fnames = [
out_dir.joinpath(cell_id, f"{in_fname.stem}_with_consensus.fasta")
for (in_fname, cell_id) in zip(in_fnames, cell_ids)]
logging.debug(
"Found the following alignments:\b%s",
"\n".join(map(str, in_fnames)))
# TODO filter this to only include those that contain fasta files
# (There's a spurious "aligned" empty directory created.)
align_dirs = sorted(set((fname.parent for fname in in_fnames)))
nprocs = int(sys.argv[3]) - 1
if nprocs == 0:
mode = "for loop"
else:
# mode = "mp"
mode = "dask"
# TODO: adapt to the new fasta2cons interface
if mode == "mp":
logging.info("Trying to use %s processors (mode %s).", nprocs, mode)
pool = mp.Pool(processes=nprocs)
computations = [
pool.apply_async(make_cons_dir, args=(align_dir, out_dir))
for align_dir in align_dirs]
dir_names = [comp.get() for comp in computations]
computations = [
pool.apply_async(fasta2cons, args=fnames)
for fnames in zip(in_fnames, cons_fnames, with_cons_fnames)]
out_fnames = [comp.get() for comp in computations]
elif mode == "dask":
logging.info("Trying to use %s processors (mode %s).", nprocs, mode)
dask.config.set(scheduler="processes")
computations = []
for align_dir in align_dirs:
computations.append(
dask.delayed(make_cons_dir)(align_dir, out_dir))
[dir_names] = dask.compute([*computations], num_workers=nprocs)
computations = []
for fnames in zip(in_fnames, cons_fnames, with_cons_fnames):
computations.append(
dask.delayed(fasta2cons)(*fnames))
[out_fnames] = dask.compute([*computations], num_workers=nprocs)
else:
dir_names = []
for align_dir in align_dirs:
dir_names.append(make_cons_dir(align_dir, out_dir))
out_fnames = []
for fnames in zip(in_fnames, cons_fnames, with_cons_fnames):
out_fnames.append(fasta2cons(*fnames))
logging.info("Finished consensus computation.")
logging.debug("\n".join(out_fnames))
return 0
if __name__ == "__main__":
sys.exit(main())
__copyright__ = "Copyright (C) 2020 Blaise Li"
__licence__ = "GNU GPLv3"
__version__ = 0.1
from .libconsensus import (
ali2cons,
fasta2cons,
)
# Copyright (C) 2020 Blaise Li
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
This library provides utilities to generate a consensus
from an alignment of sequences.
"""
import logging
# from datetime import date
from itertools import dropwhile
from functools import partial
from operator import gt, itemgetter, or_
from pathlib import Path
from shutil import copyfileobj
import pandas as pd
from cytoolz import (
accumulate, compose, dissoc,
first, frequencies, juxt, reduce)
from mappy import fastx_read
# Numerical values of nucleotides, as powers of 1
A = 1
C = 2
G = 4
T = 8
# Values of ambiguity codes, as sums of powers of 2
R = A + G
Y = C + T
S = G + C
W = A + T
K = G + T
M = A + C
B = C + G + T
D = A + G + T
H = A + C + T
V = A + C + G
N = A + C + G + T
# Converters between numerical values and letters
nucl2val = dict(zip( # pylint: disable=C0103
"ACGTRYSWKMBDHVN",
[A, C, G, T, R, Y, S, W, K, M, B, D, H, V, N])).get
val2nucl = dict(zip( # pylint: disable=C0103
[A, C, G, T, R, Y, S, W, K, M, B, D, H, V, N],
"ACGTRYSWKMBDHVN")).get
def iupac(nucls):
"""
Compute the IUPAC ambiguity code resulting from the union of *nucls*.
"""
return val2nucl(reduce(or_, map(nucl2val, nucls)))
def fasta2seqlines(fname):
"""
Generate lists of letters corresponding to sequences
in fasta-formatted file *fname*.
"""
for (_, seq, *_) in fastx_read(str(fname)):
# Get sequence from next line, convert it to list of letters.
yield list(seq)
def count(letters, nogap=False):
"""
Count the letters in *letters* and return a sorted list of
pairs (letter, count).
If *nogap* is set to True, gaps are ignored.
"""
if nogap:
return sorted(
dissoc(frequencies(letters), "-").items(),
key=itemgetter(1), reverse=True)
return sorted(
frequencies(letters).items(),
key=itemgetter(1), reverse=True)
def keyset_add(cumul_counts, counter_item):
"""
Perform one step of accumulation in a set
using first elements of tuples.
"""
return or_(cumul_counts[0], {counter_item[0]})
def counts_add(cumul_counts, counter_item):
"""
Perform one step of summing
using second elements of tuples.
"""
return cumul_counts[1] + counter_item[1]
def col_consensus(column, cum_freq=0.75, do_iupac=False):
"""
Determine the consensus of the letters present in *column*.
*cum_freq* is the frequency required for the majority letter
to be used as consensus.
When a single letter accounts for the top *cum_freq* letters in the column,
it is used as consensus.
If *do_iupac* is set to True, and no "-" is contained in the column,
then the ambiguity code of the nucleotides comprising the top
*cum_freq* letters in the column is used. Otherwise, "N" is used.
"""
# letter_counts = count(column)
# Number of sequences entering in the consensus
cons_count = len(column) * cum_freq
cum_counts = accumulate(
juxt(keyset_add, counts_add),
# letter_counts,
count(column),
initial=(set(), 0))
# True if cons_count > the second element
pred = compose(partial(gt, cons_count), itemgetter(1))
# Set of letters accounting for this cumulated count
letters = first(dropwhile(pred, cum_counts))[0]
try:
# If there is only one letter, we use it.
[letter] = letters
return letter
except ValueError:
if "-" not in letters and do_iupac:
return iupac(letters)
# Otherwise, consensus will be "N", because
# we don't know how to handle "-"
return "N"
def ali2cons(fname):
"""
Generate the consensus of a sequence alignement present in
file *fname*.
*fname* should be the path to a fasta file containing aligned
sequences.
"""
ali = pd.DataFrame(fasta2seqlines(fname))
return "".join(ali.apply(col_consensus))
def fasta2cons(ali_fname, cons_alone, ali_with_cons):
"""
Generate the consensus of a sequence alignement present in
file *ali_fname* and write it alone in *cons_alone* as well as
together with the aligned sequences in *ali_with_cons*.
*ali_fname* should be the path to a fasta file containing aligned
sequences.
"""
# Ensure we have a pathlib.Path object
ali_fname = Path(ali_fname)
cons_seq = ali2cons(ali_fname)
Path(cons_alone).parent.mkdir(parents=True, exist_ok=True)
with open(cons_alone, "w") as out_fasta:
out_fasta.write(f">{ali_fname.stem}_consensus\n")
out_fasta.write(f"{cons_seq.replace('-', '')}\n")
logging.info("%s written", cons_alone)
Path(ali_with_cons).parent.mkdir(parents=True, exist_ok=True)
with open(ali_with_cons, "w") as out_fasta:
with open(ali_fname) as in_fasta:
copyfileobj(in_fasta, out_fasta)
out_fasta.write(f">{ali_fname.stem}_consensus\n")
out_fasta.write(f"{cons_seq}\n")
logging.info("%s written", ali_with_cons)
return str(ali_with_cons)
[build-system]
requires = [
"setuptools",
]
build-backend = "setuptools.build_meta"
setup.py 0 → 100644
# Copyright (C) 2020 Blaise Li
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
from setuptools import setup, find_packages
#from Cython.Build import cythonize
#from distutils.extension import Extension
name = "libconsensus"
# Adapted from Biopython
__version__ = "Undefined"
for line in open("%s/__init__.py" % name):
if (line.startswith('__version__')):
exec(line.strip())
# https://github.com/cython/cython/blob/master/docs/src/reference/compilation.rst#configuring-the-c-build
#extensions = [
# Extension(
# "libreads.libbamutils", ["libreads/libbamutils.pyx"],
# include_dirs=pysam_get_include()),
# ]
setup(
name=name,
version=__version__,
description="Miscellaneous things to make a consensus out of a sequence alignment.",
author="Blaise Li",
author_email="blaise.li@normalesup.org",
license="GNU GPLv3",
python_requires=">=3.6, <4",
packages=find_packages(),
scripts=["bin/ali2consensus.py"],
#ext_modules = extensions,
setup_requires=[
#"wheel",
#"cython",
#"pysam",
],
install_requires=[
"cytoolz",
"dask",
"mappy",
"pandas",
],
)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment