Commit 2b90d85f authored by Remi  PLANEL's avatar Remi PLANEL
Browse files

Add script to clean gff

parent 52d33f5a
#!/usr/bin/env python3
import re
from pathlib import Path
import click
class Error(Exception):
"""Base class for other exceptions"""
pass
class NoFeatureId(Error):
"""Raised when no feature id is found"""
pass
class NoLocusTag(Error):
"""Raised when no feature id is found"""
pass
class NoGeneFeature(Error):
"""Raised when no feature id is found"""
pass
class NoMRNAFeature(Error):
"""Raised when no feature id is found"""
pass
class NoCDSFeature(Error):
"""Raised when no feature id is found"""
pass
@click.command()
@click.option(
"-g",
"--gff",
type=click.Path(exists=True, dir_okay=True, file_okay=True),
help="GFF genome",
)
@click.option(
"-o", "--output-dir", type=click.Path("wb"), help="path to the output dir"
)
def link_features(gff, output_dir):
input_path = Path(gff)
feature_index = index_features(input_path)
write_gff(feature_index, input_path, output_dir)
# add_feature_parent(input_path, feature_index, output_dir)
def write_gff(feature_index, input_path, output_dir):
filename = input_path.stem
ext = input_path.suffix
output_path = Path(output_dir) / (filename + "-with-parent" + ext)
with open(output_path, "w") as output_handle:
for feat_group_id, feat_group in feature_index.items():
# check if ncRNA
if feat_group_id.startswith("lppnc"):
write_ncrna(feat_group_id, feat_group, output_handle)
elif "gene" in feat_group and "mRNA" in feat_group:
write_gene(feat_group_id, feat_group, output_handle)
else:
for feat_type, lines in feat_group.items():
if feat_type == "gene":
feat_id = feat_group_id
output_handle.write(
clean_entry_no_parent(lines[0], feat_id, feat_group_id)
+ "\n"
)
else:
for i, line in enumerate(lines):
feat_id = feat_group_id + "." + feat_type + "." + str(i + 1)
output_handle.write(
clean_entry_no_parent(line, feat_id, feat_group_id)
+ "\n"
)
print("DONE")
def write_ncrna(feature_group_id, feature_group, output_handle):
nc_rna_id = None
if "CDS" in feature_group:
feat_type = "NcRNA"
cds_line = feature_group["CDS"][0]
columns = cds_line.split("\t")
columns[2] = feat_type
nc_rna_id = feature_group_id + "." + feat_type
entry = clean_entry("\t".join(columns), None, nc_rna_id, feature_group_id,)
output_handle.write(entry + "\n")
else:
raise Exception(f"No CDS for {feature_group_id}")
if "promoter" in feature_group:
for i, line in enumerate(feature_group["promoter"]):
promoter_id = feature_group_id + ".promoter." + str(i + 1)
entry = clean_entry(line, nc_rna_id, promoter_id, feature_group_id)
output_handle.write(entry + "\n")
if "TSS" in feature_group:
for i, line in enumerate(feature_group["TSS"]):
tss_id = feature_group_id + ".TSS." + str(i + 1)
entry = clean_entry(line, nc_rna_id, tss_id, feature_group_id)
output_handle.write(entry + "\n")
def write_gene(feat_group_id, feat_group, output_handle):
gene_id = None
mrna_id = None
if "gene" in feat_group:
gene = feat_group["gene"][0]
gene_id = feat_group_id
entry = clean_entry(gene, None, gene_id, feat_group_id)
output_handle.write(entry + "\n")
else:
raise Exception(f"missing gene form {feat_group_id}")
if "mRNA" in feat_group:
line = feat_group["mRNA"][0]
mrna_id = feat_group_id + ".mRNA"
if gene_id:
entry = clean_entry(line, gene_id, mrna_id, feat_group_id)
output_handle.write(entry + "\n")
else:
raise Exception("no mRNA")
else:
print(f"missing mRNA form {feat_group_id}")
raise NoMRNAFeature(feat_group_id)
if "CDS" in feat_group:
line = feat_group["CDS"][0]
cds_id = feat_group_id + ".CDS"
entry = clean_entry(line, mrna_id, cds_id, feat_group_id)
output_handle.write(entry + "\n")
else:
raise NoCDSFeature(feat_group_id)
if "promoter" in feat_group:
for i, line in enumerate(feat_group["promoter"]):
if i == 0:
trna_id = feat_group_id + ".promoter"
else:
trna_id = feat_group_id + ".promoter." + str(i + 1)
entry = clean_entry(line, mrna_id, trna_id, feat_group_id)
output_handle.write(entry + "\n")
if "TSS" in feat_group:
for i, line in enumerate(feat_group["TSS"]):
if i == 0:
tss_id = feat_group_id + ".TSS"
else:
tss_id = feat_group_id + ".TSS." + str(i + 1)
entry = clean_entry(line, mrna_id, tss_id, feat_group_id)
output_handle.write(entry + "\n")
def clean_entry_no_parent(line, feat_id, locus_tag):
clean_line = []
columns = line.split("\t")
features_str = columns[-1]
features = features_str.split(";")
for feat in features:
if feat.startswith("ID=") or feat.startswith("locus_tag="):
continue
else:
clean_line.append(feat)
clean_line.append("ID=" + feat_id)
clean_line.append("locus_tag=" + locus_tag)
columns[-1] = ";".join(clean_line)
return "\t".join(columns)
def clean_entry(line, parent_id, feat_id, locus_tag):
clean_line = []
columns = line.split("\t")
features_str = columns[-1]
features = features_str.split(";")
for feat in features:
if (
feat.startswith("Parent=")
or feat.startswith("ID=")
or feat.startswith("locus_tag=")
):
continue
else:
clean_line.append(feat)
clean_line.append("ID=" + feat_id)
clean_line.append("locus_tag=" + locus_tag)
if parent_id:
clean_line.append("Parent=" + parent_id)
columns[-1] = ";".join(clean_line)
return "\t".join(columns)
def add_feature_parent(input_path, feature_index, output_dir):
filename = input_path.stem
ext = input_path.suffix
with open(
Path(output_dir) / (filename + "-with-parent" + ext), "w"
) as output_handle:
with input_path.open() as input_gff:
for line in input_gff:
line = line.strip()
if line.startswith(">"):
break
if line.startswith("#"):
output_handle.write(line + "\n")
else:
columns = line.split("\t")
feature_name = columns[2]
# link TSS
if feature_name == "TSS":
# print(line)
feature_str = columns[-1]
try:
feat_id = get_feature_id(feature_str)
trim_feat_id = get_trimmed_id(feat_id)
features = feature_index[trim_feat_id]
# print(trim_feat_id)
# print(features)
if "mRNA" in features:
output_handle.write(
line
+ ";Parent="
+ feature_index[trim_feat_id]["mRNA"]["id"]
+ "\n"
)
else:
output_handle.write(line + "\n")
except NoFeatureId:
# try to get parent id
parent_id = get_parent_id(feature_str)
trim_parent_id = get_trimmed_id(parent_id)
parent_features = feature_index[trim_parent_id]
if "TSS" in parent_features:
output_handle.write(
line
+ ";ID="
+ parent_features["TSS"]["id"]
+ ".1"
+ "\n"
)
else:
output_handle.write(
line + ";ID=" + trim_parent_id + ".TSS" + "\n"
)
# print(trim_id)
# gene_id = feature_index[trim_id]["gene"]
# output_handle.write(line + ";Parent=" + gene_id + "\n")
# print("tot")
else:
output_handle.write(line + "\n")
# feature_column =
def get_parent_id(features_str):
features = features_str.split(";")
for feat in features:
if feat.startswith("Parent="):
return feat.split("=")[1]
# return None
raise NoFeatureId(f"no parent for {features_str}")
def index_features(input_path):
feature_index = dict()
def fill_index(feat_group_id, feature_name, line):
if feat_group_id:
if feat_group_id in feature_index:
feat_group = feature_index[feat_group_id]
if feature_name in feat_group:
features = feat_group[feature_name]
features.append(line)
else:
feat_group[feature_name] = [line]
# feat_group.update(
# {feature_name: {feat_id: {"id": feat_id, "line": line}}}
# )
else:
feature_index.update({feat_group_id: {feature_name: [line]}})
with input_path.open() as input_gff:
for line in input_gff:
line = line.strip()
if line.startswith(">"):
break
if not line.startswith("#"):
columns = line.split("\t")
if len(columns) == 9:
feature_name = columns[2]
features_str = columns[-1]
feat_group_id = None
try:
feat_group_id = get_locus_tag(features_str)
fill_index(feat_group_id, feature_name, line)
except NoLocusTag:
try:
feat_id = get_feature_id(features_str)
feat_group_id = get_trimmed_id(feat_id)
fill_index(feat_group_id, feature_name, line)
continue
except NoFeatureId:
parent_id = get_parent_id(features_str)
feat_group_id = get_trimmed_id(parent_id)
fill_index(feat_group_id, feature_name, line)
continue
else:
raise Exception(
f"Column lenght not expected. should be 9. Got {len(columns)} column for line : {line}"
)
# print(feature_index)
return feature_index
def get_locus_tag(features_str):
features = features_str.split(";")
for feat in features:
if feat.startswith("locus_tag="):
return feat.split("=")[1]
# return None
raise NoLocusTag(f"no locus tag for: {features_str}")
def get_feature_id(features_str):
features = features_str.split(";")
for feat in features:
if feat.startswith("ID="):
return feat.split("=")[1]
# return None
raise NoFeatureId(f"no id for {features_str}")
def get_trimmed_id(feat_id):
p = re.compile("([^.]+)[.]*[^.]*")
m = p.match(feat_id)
return m.group(1)
if __name__ == "__main__":
link_features()
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment