Select Git revision
fq_demux.py
fq_demux.py 4.15 KiB
#!/usr/bin/env python3
"""This program demultiplexes fastq data based on a given list of
barcodes and a barcode start position. It tries to assign each fastq
record to the most likely barcode, taking into account the sequence
qualities (interpreted as being sanger-encoded)."""
import argparse
import fileinput
import os
from itertools import islice
OPD = os.path.dirname
OPJ = os.path.join
def parse_fastq(input_stream):
"""Generates fastq triplets for text *input_stream*."""
for line in input_stream:
name = line.strip().decode("utf-8")
seq = input_stream.readline().strip().decode("utf-8")
input_stream.readline()
qual = input_stream.readline().strip().decode("utf-8")
yield (name, seq, qual)
def main():
"""Main function of the program."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
"-i", "--input_fastq",
help="Fastq file to demultiplex.")
parser.add_argument(
"-o", "--output_dir",
help="Directory in which to put the demultiplexed fastq files.")
parser.add_argument(
"-b", "--barcodes",
help="Barcodes to which the reads should be attributed.",
nargs="*")
parser.add_argument(
"-s", "--barcode_start",
type=int,
required=True,
help="Position at which the barcode starts (1-based).")
parser.add_argument(
"-m", "--max_diff",
type=int,
default=3,
help="Only assign a record to one of the barcodes if it has "
"no more than *max_diff* differences in its barcode portion.")
args = parser.parse_args()
if args.output_dir:
out_dir = args.output_dir
os.makedirs(out_dir, exist_ok=True)
else:
if args.input_fastq is None:
out_dir = "."
else:
out_dir = OPD(args.input_fastq)
outfiles = {}
bc_len = None
for barcode in args.barcodes:
if bc_len is None:
bc_len = len(barcode)
else:
assert(len(barcode) == bc_len), f"len({barcode}) != {bc_len}"
outfiles[barcode] = open(OPJ(out_dir, f"{barcode}.fastq"), "w")
if not outfiles:
print("No barcode given.")
exit(0)
outfiles["undetermined"] = open(OPJ(
out_dir, f"undetermined.fastq"), "w")
bc_start = args.barcode_start - 1
def bc_prob_profile(seq, qual):
# return [(letter, 10 ** (-quality / 10))
return [(letter, 10 ** (-ord(quality) / 10))
for letter, quality
in islice(zip(seq, qual), bc_start, bc_start + bc_len)]
def barcode_prob_computer(seq, qual):
bpp = bc_prob_profile(seq, qual)
def prob(barcode):
bc_prob = 1
nb_diff = 0
for ((seq_letter, prob_err), bc_letter) in zip(bpp, barcode):
if seq_letter == bc_letter:
bc_prob *= (1 - prob_err)
else:
nb_diff += 1
bc_prob *= (prob_err / 3)
return bc_prob, nb_diff
return prob
if args.input_fastq is None:
fq_files = ("-",)
else:
fq_files = (args.input_fastq,)
with fileinput.input(files=fq_files,
openhook=fileinput.hook_compressed) as fq_in:
for (name, seq, qual) in parse_fastq(fq_in):
bc_prob = barcode_prob_computer(seq, qual)
(best_bc, best_prob) = ("", 0.0)
best_diff = bc_len
for barcode in args.barcodes:
(this_prob, this_diff) = bc_prob(barcode)
if this_prob > best_prob:
(best_bc,
best_prob,
best_diff) = (barcode, this_prob, this_diff)
bc_comments = f"{best_bc}:{best_prob}:{best_diff}"
fq_repr = f"{name} {bc_comments}\n{seq}\n+\n{qual}\n"
if best_diff <= args.max_diff:
outfiles[best_bc].write(fq_repr)
else:
outfiles["undetermined"].write(fq_repr)
for outfile in outfiles.values():
outfile.close()
if __name__ == "__main__":
exit(main())