Skip to content
Snippets Groups Projects
Commit 84f40977 authored by Blaise Li's avatar Blaise Li
Browse files

Added cythonized fastq extractor.

Also factorized code to make stats.
The parallel version does not seem to work due to serialization issues,
though.
parent eb009652
No related branches found
No related tags found
No related merge requests found
__version__ = 0.2
from .libreads import (
ali2fq,
bam2fastq,
make_read_statter,
plot_base_logos_along_read,
plot_base_logos_by_size,
......
#!/usr/bin/env python3
# vim: set fileencoding=<utf-8> :
# cython: language_level=3
"""
This library contains a cythonized version of utilities to work with bam files.
"""
# http://stackoverflow.com/a/23575771/1878788
# Also works by setting language_level as comment
# at the top of the file.
#from __future__ import print_function
#from pybedtools import Interval
from pysam.libchtslib cimport BAM_FREVERSE
#cdef int BAM_FREVERSE = 16
#from pysam.libcalignmmentfile cimport AlignmentFile
from pysam.libcalignedsegment cimport AlignedSegment
from pysam.libcutils cimport array_to_qualitystring
cdef str cali2fq(AlignedSegment ali):
"""Return a string representing the read aligned in AlignedSegment *ali*"""
seq = ali.get_forward_sequence()
qual = array_to_qualitystring(ali.get_forward_qualities())
name = ali.query_name
return f"@{name}\n{seq}\n+\n{qual}\n"
def ali2fq(ali):
return cali2fq(ali)
def bam2fastq(bam_reads,
chrom,
start,
end,
strand=None,
max_read_len=None):
"""Generate fastq representations of reads mapped in *bam_reads*.
Only reads in region defined by *chrom*, *start* and *end* are considered.
If *strand* is "-" or "+", only the reads from the corresponding strand
will be generated.
If *max_read_len* is set, only the reads with a length
at most *max_read_len* will be generated.
"""
if strand is None:
if max_read_len is None:
for ali in bam_reads.fetch(chrom, start, end):
yield cali2fq(ali)
else:
for ali in bam_reads.fetch(chrom, start, end):
if ali.query_length <= max_read_len:
yield cali2fq(ali)
elif strand == "+":
if max_read_len is None:
for ali in bam_reads.fetch(chrom, start, end):
if ali.flag & BAM_FREVERSE == 0:
yield cali2fq(ali)
else:
for ali in bam_reads.fetch(chrom, start, end):
if ali.flag & BAM_FREVERSE == 0:
if ali.query_length <= max_read_len:
yield cali2fq(ali)
elif strand == "-":
if max_read_len is None:
for ali in bam_reads.fetch(chrom, start, end):
if ali.flag & BAM_FREVERSE != 0:
yield cali2fq(ali)
else:
for ali in bam_reads.fetch(chrom, start, end):
if ali.flag & BAM_FREVERSE != 0:
if ali.query_length <= max_read_len:
yield cali2fq(ali)
else:
raise ValueError("strand can only be \"+\", \"-\" or None.")
# def bam2signal(bam_reads,
# chrom: str,
# start: int,
# end: int,
# max_read_len: int) -> Mapping[str, Array[int]]:
# """
# Extract 5'-end signal from `pysam.AlignmentFile` *bam_reeads*
# for the region defined by *chrom*, *start* and *end*.
# """
# signal: Mapping[str, Array[int]] = defaultdict(
# lambda: np.zeros(end-start + max_read_len, dtype=int))
# for read in collapse_and_sort(bam_reads.fetch(chrom, start, end)):
# try:
# # subttracting start, to translate chromosomic coordinates into array indices
# signal[read.strand][read.start - start] += 1
# except IndexError as err:
# # 5' of reverse reads may fall outside the right-most boundary of the island
# print(str(err))
# print(len(signal[read.strand]))
# print(read)
# return signal
def main():
"""Example use case."""
print("No examples to show here.")
return 0
if __name__ == "__main__":
import sys
sys.exit(main())
......@@ -8,6 +8,7 @@ import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from mappy import fastx_read
from .libbamutils import ali2fq, bam2fastq
......@@ -35,63 +36,34 @@ def make_read_statter(max_len=None, alphabet=None, processes=1):
letter2index = dict((letter, idx) for (idx, letter) in enumerate(alphabet))
# To use as DataFrame index
base_index = pd.Index(list(alphabet), name="base")
# TODO: try to speed this using ProcessPoolExecutor?
def extract_seq_info(seq):
seq_len = len(seq)
if seq_len > max_len:
# raise ValueError(
# f"{fastx_filename} contains reads of size {seq_len} "
# f"(> {max_len})")
raise ValueError(
f"The file contains reads of size {seq_len} "
f"(> {max_len})")
as_letter_indices = [letter2index[letter] for letter in seq]
letter_indices_from_start = as_letter_indices[slicer_from_start]
letter_indices_from_end = as_letter_indices[slicer_from_end]
return (seq_len, seq[0], seq[-1],
letter_indices_from_start,
letter_indices_from_end,
range(seq_len))
def fastx2read_stats(fastx_filename):
"""Return statistics about reads from file *fastx_filename*.
The statistics consist in a dict of tables with the following keys:
* "size": read size distribution as a :class:`pandas.Series`
* "composition_at_start"
* "composition_at_end"
* "composition_from_start"
* "composition_from_end"
Each of these tables is a :class:`pandas.DataFrame`
with *max_len* columns.
"""
size = defaultdict(int)
first_base = defaultdict(lambda: defaultdict(int))
last_base = defaultdict(lambda: defaultdict(int))
composition_from_start = np.zeros((len(alphabet), max_len))
composition_from_end = np.zeros((len(alphabet), max_len))
for (_, seq, *_) in fastx_read(fastx_filename):
seq_len = len(seq)
if seq_len > max_len:
raise ValueError(
f"{fastx_filename} contains reads of size {seq_len} "
f"(> {max_len})")
for (pos, letter) in enumerate(seq[slicer_from_start]):
composition_from_start[letter2index[letter]][pos] += 1
for (pos, letter) in enumerate(seq[slicer_from_end]):
composition_from_end[letter2index[letter]][pos] += 1
size[seq_len] += 1
first_base[seq_len][seq[0]] += 1
last_base[seq_len][seq[-1]] += 1
composition_at_start = pd.DataFrame.from_dict(first_base).reindex(
base_index)
composition_at_start.columns.name = "read length"
composition_at_end = pd.DataFrame.from_dict(last_base).reindex(
base_index)
composition_at_end.columns.name = "read length"
composition_from_start = pd.DataFrame(
composition_from_start, index=base_index)
composition_from_start.columns = pd.Index(
range(1, max_len + 1), name="position_from_start")
composition_from_end = pd.DataFrame(
composition_from_end, index=base_index)
composition_from_end.columns = pd.Index(
range(1, max_len + 1), name="position_from_end")
return {
"total": pd.Series({"total": sum(size.values())}),
"size": pd.Series(size).sort_index(),
"composition_at_start": composition_at_start.fillna(0),
"composition_at_end": composition_at_end.fillna(0),
"composition_from_start": composition_from_start.fillna(0),
"composition_from_end": composition_from_end.fillna(0)
}
if processes > 1:
executor = ProcessPoolExecutor(max_workers=processes)
def do_extract_seq_info(seq_generator):
yield from executor.map(
extract_seq_info, seq_generator, chunksize=10000)
else:
def do_extract_seq_info(seq_generator):
yield from map(extract_seq_info, seq_generator)
def fastx2read_stats_parallel(fastx_filename):
def fastx2read_stats(fastx_filename):
"""Return statistics about reads from file *fastx_filename*.
The statistics consist in a dict of tables with the following keys:
......@@ -109,28 +81,12 @@ def make_read_statter(max_len=None, alphabet=None, processes=1):
last_base = defaultdict(lambda: defaultdict(int))
composition_from_start = np.zeros((len(alphabet), max_len))
composition_from_end = np.zeros((len(alphabet), max_len))
executor = ProcessPoolExecutor(max_workers=processes)
# TODO: try to speed this using ProcessPoolExecutor?
def extract_seq_info(seq):
seq_len = len(seq)
if seq_len > max_len:
raise ValueError(
f"{fastx_filename} contains reads of size {seq_len} "
f"(> {max_len})")
as_letter_indices = [letter2index[letter] for letter in seq]
letter_indices_from_start = as_letter_indices[slicer_from_start]
letter_indices_from_end = as_letter_indices[slicer_from_end]
return (seq_len, seq[0], seq[-1],
letter_indices_from_start,
letter_indices_from_end,
range(seq_len))
seq_generator = (seq for (_, seq, *_) in fastx_read(fastx_filename))
for (seq_len, start_letter, end_letter,
letter_indices_from_start,
letter_indices_from_end,
positions) in executor.map(
extract_seq_info, seq_generator, chunksize=10000):
positions) in do_extract_seq_info(seq_generator):
composition_from_start[letter_indices_from_start, positions] += 1
composition_from_end[letter_indices_from_end, positions] += 1
size[seq_len] += 1
......@@ -158,8 +114,7 @@ def make_read_statter(max_len=None, alphabet=None, processes=1):
"composition_from_start": composition_from_start.fillna(0),
"composition_from_end": composition_from_end.fillna(0)
}
if processes > 1:
return fastx2read_stats_parallel
return fastx2read_stats
......@@ -265,7 +220,8 @@ def plot_logo(data, axis, xlabel=None, ylabel=None, legend=None):
def plot_base_logos_by_size(datas,
data_labels=None, fig_path=None, ylabel=None):
data_labels=None, fig_path=None, ylabel=None,
fig=None, axes=None):
"""
Plot bar graphs representing read composition using a representation
where bar heights are proportional to the information content.
......@@ -283,7 +239,14 @@ def plot_base_logos_by_size(datas,
"""
if data_labels is None:
data_labels = [str(i) for i in range(len(datas))]
(fig, axes, data_labels) = vstack_fig_setup(datas, data_labels)
if axes is None:
(fig, axes, data_labels) = vstack_fig_setup(datas, data_labels)
legend_holder = fig
else:
if fig is None:
legend_holder = axes[0]
else:
legend_holder = fig
for (nucl_table, axis, label) in zip(datas, axes, data_labels):
(handles, labels) = plot_logo(
nucl_table, axis=axis, legend=False, xlabel=False, ylabel=False)
......@@ -296,7 +259,7 @@ def plot_base_logos_by_size(datas,
plt.ylabel(ylabel, labelpad=30)
plt.title("")
label2handle = dict(islice(zip(labels, handles), 5))
fig.legend(
legend_holder.legend(
handles=[label2handle[nucl] for nucl in ["A", "C", "G", "T", "N"]],
labels=["A", "C", "G", "T", "N"],
loc=7)
......@@ -308,7 +271,8 @@ def plot_base_logos_by_size(datas,
def plot_base_logos_along_read(datas,
data_labels=None, fig_path=None,
xlabel=None, ylabel=None):
xlabel=None, ylabel=None,
fig=None, axes=None):
"""
Plot bar graphs representing read composition using a representation
where bar heights are proportional to the information content.
......@@ -326,7 +290,14 @@ def plot_base_logos_along_read(datas,
"""
if data_labels is None:
data_labels = [str(i) for i in range(len(datas))]
(fig, axes, data_labels) = vstack_fig_setup(datas, data_labels)
if axes is None:
(fig, axes, data_labels) = vstack_fig_setup(datas, data_labels)
legend_holder = fig
else:
if fig is None:
legend_holder = axes[0]
else:
legend_holder = fig
for (nucl_table, axis, label) in zip(datas, axes, data_labels):
(handles, labels) = plot_logo(
nucl_table, axis=axis, legend=False, xlabel=False, ylabel=False)
......@@ -341,7 +312,7 @@ def plot_base_logos_along_read(datas,
plt.ylabel(ylabel, labelpad=30)
plt.title("")
label2handle = dict(islice(zip(labels, handles), 5))
fig.legend(
legend_holder.legend(
handles=[label2handle[nucl] for nucl in "ACGTN"],
labels="ACGTN",
loc=7)
......
[build-system]
requires = [
"setuptools",
"Cython",
"pysam"
]
build-backend = "setuptools.build_meta"
from setuptools import setup, find_packages
#from Cython.Build import cythonize
from distutils.extension import Extension
from pysam import get_include as pysam_get_include
name = "libreads"
......@@ -10,6 +12,13 @@ for line in open("%s/__init__.py" % name):
exec(line.strip())
# https://github.com/cython/cython/blob/master/docs/src/reference/compilation.rst#configuring-the-c-build
extensions = [
Extension(
"libreads.libbamutils", ["libreads/libbamutils.pyx"],
include_dirs=pysam_get_include()),
]
setup(
name=name,
version=__version__,
......@@ -19,7 +28,14 @@ setup(
license="MIT",
packages=find_packages(),
# scripts=["scripts/"],
ext_modules = extensions,
setup_requires=[
"wheel",
"cython",
#"pysam",
],
install_requires=[
"pysam",
#"libworkflows @ git+https://gitlab+deploy-token-31:isEzpsgbNf2sJMdUDy2g@gitlab.pasteur.fr/bli/libworkflows.git",
"matplotlib",
#"networkx",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment