split-genome-to-gene.py 2.56 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
#!/usr/bin/env python3
from Bio import SeqIO
from Bio.SeqRecord import SeqRecord
from pathlib import Path
import click
from itertools import chain
import re


@click.command()
@click.option(
    "-d",
    "--directory",
    type=click.Path(exists=True, dir_okay=True, file_okay=True),
    help="Genome input source",
)
@click.option(
    "-o", "--output-dir", type=click.Path("wb"), help="path to the output dir"
)
@click.option("-f", "--output-format", type=str, help="output sequence format")
@click.option("-t", "--feature-type", type=str, help="")
def split_genomes(directory, output_dir, output_format, feature_type):
    genomes = gen_path_with_format(directory)
    gen_genomes = gen_genomes_with_steam(genomes)
    for filename, genome in gen_genomes:
        write_sequence(genome, filename, output_dir, output_format, feature_type)


def write_sequence(genome, filename, directory, sequence_format, feature_type):
    with open(
        Path(directory) / (filename + "." + sequence_format), "w"
    ) as output_handle:
        print("# Write file " + filename)
        for record in genome:
            for feat in record.features:
                if feat.type == feature_type:
                    feature_seq = feat.extract(record.seq)
                    seq_record = SeqRecord(feature_seq)
                    locus_tags = feat.qualifiers.get("locus_tag")
                    product = feat.qualifiers.get("product")
                    if locus_tags and len(locus_tags) > 0:
                        seq_record.id = locus_tags[0]
                        if product:
                            p = re.compile(r"^\s+$")
                            if not p.match(product[0]):
                                seq_record.description = product[0]
                        SeqIO.write(seq_record, output_handle, sequence_format)
                    else:
                        raise ("No locus tag !!!")


def gen_path_with_format(input_str):
    input_path = Path(input_str)
    if input_path.is_dir():
        for genome_path in chain(
            map(lambda genome: ("embl", genome), input_path.rglob("*.embl")),
            map(lambda genome: ("genbank", genome), input_path.rglob("*.genbank")),
        ):
            yield genome_path
    else:
        suffix = input_path.suffix
        if suffix == ".embl":
            yield ("embl", input_path)
        else:
            yield ("genbank", input_path)


def gen_genomes_with_steam(genomes):
    for genome_format, genome in genomes:
        yield (str(Path(genome.stem)), SeqIO.parse(genome, genome_format))


if __name__ == "__main__":
    split_genomes()