-
Remi PLANEL authored031015b9
main.py 23.54 KiB
import typer
import sys
import json
import pandas as pd
import csv
import tempfile
import matplotlib.pyplot as plt
from operator import itemgetter
from pandas.errors import ParserError
from typing_extensions import Annotated
from typing import Optional, List
from pathlib import Path
from pydantic import BaseModel, ValidationError
import frontmatter
from enum import Enum
from rich.console import Console
import re
import requests
from Bio.PDB import PDBParser, MMCIFIO
import tarfile
import xml.etree.ElementTree as ET
console = Console()
app = typer.Typer()
class LayoutEnum(str, Enum):
article = "article"
db = "db"
class TableArticle(BaseModel):
doi: str
class TableColumns(BaseModel):
article: TableArticle
Sensor: Optional[str] = None
Activator: Optional[str] = None
Effector: Optional[str] = None
PFAM: Optional[str] = None
class RelevantAbstract(BaseModel):
doi: str
class FrontMatter(BaseModel):
title: str
layout: LayoutEnum
tableColumns: TableColumns
relevantAbstracts: List[RelevantAbstract]
contributors: List[str]
@app.command()
def lint(
file: Annotated[
Path,
typer.Option(
exists=False,
file_okay=True,
writable=True,
),
],
):
console.rule(f"[bold blue]{file.name}", style="blue")
with open(file) as f:
metadata, _ = frontmatter.parse(f.read())
# print(metadata)
try:
FrontMatter.model_validate(metadata)
except ValidationError as exc:
for err in exc.errors():
console.print(
f"[red]{err['msg']} : {err['type']} {' -> '.join([str(l) for l in err['loc']])}"
)
# raise
sys.exit(1)
else:
console.print("[green] Everything is alright")
@app.command(
help="Download all structures from statistics file. Generate PAE from raw data and organize these files per systems"
)
def structure(
user: Annotated[str, typer.Option(help="username credential")],
password: Annotated[str, typer.Option(help="password credential")],
stat: Annotated[
Path,
typer.Option(
exists=True,
file_okay=True,
dir_okay=True,
writable=False,
readable=True,
resolve_path=True,
),
],
output: Annotated[
Path,
typer.Option(
exists=False,
file_okay=False,
dir_okay=True,
writable=True,
readable=True,
resolve_path=True,
),
],
):
with open(stat, "r") as stat_f:
reader = csv.DictReader(stat_f, delimiter="\t")
count_row = 0
for row in reader:
count_row += 1
system_dir_name = row["system"].lower()
pdb_path_file = Path(row["pdb"])
foldseek_html_file = Path(str(pdb_path_file).split(".pdb")[0] + ".html")
png_structure = Path(str(pdb_path_file).split(".pdb")[0] + ".png")
files = [
{"f": pdb_path_file, "d": "PDB"},
{"f": Path(row["pae_table"]), "d": "PAE"},
{"f": Path(row["fasta_file"]), "d": "Fastas"},
{"f": foldseek_html_file, "d": "foldseek_pdb_html"},
{"f": png_structure, "d": "png"},
]
target_dir = output / system_dir_name
target_dir.mkdir(parents=True, exist_ok=True)
base_url = "https://data.atkinson-lab.com/DefenceFinder"
console.rule(f"[bold blue]{count_row} / {system_dir_name}", style="blue")
for f in files:
console.print(f"[bold blue]{f['d']}", style="blue")
str_f = str(f["f"])
if str_f and str_f != "." and str_f != "" and str_f != "na":
# get the file from atkinson lab
target_file = target_dir / f["f"]
file_to_fetch = f"{base_url}/{f['d']}/{f['f']}"
console.print(f"Fetch : {file_to_fetch}")
response = requests.get(
file_to_fetch,
auth=(user, password),
allow_redirects=True,
stream=True,
)
with open(target_file, "wb") as fh:
for chunk in response.iter_content(chunk_size=1024):
if chunk:
fh.write(chunk)
if f["d"] == "PDB":
# pdb2cif
pdb2cif(target_file)
if f["d"] == "PAE":
png_file = str(target_file).split(".tsv")[0] + ".pae.png"
try:
pae2png(target_file, png_file)
except ParserError as err:
console.print(
f"[red] file {str(target_file.name)} cannot be parsed"
)
print(err)
@app.command()
def systems(
dir: Annotated[
Path,
typer.Option(exists=False, file_okay=False, readable=True, dir_okay=True),
],
pfam: Annotated[
Path,
typer.Option(
exists=False,
file_okay=True,
writable=True,
),
],
output: Annotated[
Path,
typer.Option(
file_okay=True,
dir_okay=False,
writable=True,
resolve_path=True,
),
],
):
with open(pfam, "r") as pf:
pfam_df = pd.read_csv(pf, index_col="AC", keep_default_na=False)
systems = []
if output.exists():
output.unlink()
with open(output, "a") as ty:
for file in dir.iterdir():
if file.suffix == ".md":
console.rule(f"[bold blue]{file.name}", style="blue")
with open(file) as f:
metadata, _ = frontmatter.parse(f.read())
del metadata["layout"]
sanitizedMetadata = {**metadata}
if "tableColumns" in sanitizedMetadata:
table_data = sanitizedMetadata["tableColumns"]
if "PFAM" in table_data:
pfams_list = [
pfam.strip()
for pfam in table_data["PFAM"].split(",")
]
pfam_metadata = list()
for pfam in pfams_list:
try:
pfam_obj = pfam_df.loc[[pfam]]
# print(pfam_obj)
pfam_to_dict = pfam_obj.to_dict(orient="index")
pfam_to_dict[pfam]["AC"] = pfam
flatten_value = pfam_to_dict[pfam]
pfam_metadata.append(flatten_value)
except KeyError as err:
console.print(f"[bold red]{err}", style="red")
console.print(
f"[bold red]No pfam entry or {pfam}",
style="red",
)
continue
sanitizedMetadata["PFAM"] = pfam_metadata
if "article" in table_data:
sanitizedMetadata["doi"] = table_data["article"]["doi"]
if "abstract" in table_data["article"]:
sanitizedMetadata["abstract"] = table_data[
"article"
]["abstract"]
del table_data["article"]
if "PFAM" in table_data:
del table_data["PFAM"]
del sanitizedMetadata["tableColumns"]
sanitizedMetadata = {**sanitizedMetadata, **table_data}
systems.append(sanitizedMetadata)
json_object = json.dumps(systems, indent=2)
ty.write(json_object)
@app.command()
def system_operon_structure(
output: Annotated[
Path,
typer.Option(
file_okay=True,
dir_okay=False,
writable=True,
resolve_path=True,
),
] = "./system-structures.csv",
structure: Annotated[
Path,
typer.Option(
file_okay=True,
dir_okay=False,
writable=False,
resolve_path=True,
),
] = "./all_predictions_statistics.tsv",
versions: Annotated[List[str], typer.Option(help="Defense finder model")] = [
"1.2.2",
"1.2.3",
"v1.2.4",
],
tags: Annotated[List[str], typer.Option(help="Defense finder model")] = [
"1.2.2",
"1.2.3",
"1.2.4",
],
):
# get defense finder model from github
releases = zip(versions, tags)
model_dirs = list(download_model_release(releases))
systems = []
with open(structure) as tsvfile:
tsvreader = csv.DictReader(tsvfile, delimiter="\t")
for row in tsvreader:
systems.append({"system": row["system"], "subsystem": row["subsystem"]})
system_genes = []
system_genes_got = set()
for system_def in systems:
system, subsystem = itemgetter("system", "subsystem")(system_def)
system_id = f"{system}-{subsystem}"
list_paths = list(gen_model_path(model_dirs))
if (
system != "#N/A"
and subsystem != "#N/A"
and system_id not in system_genes_got
):
system_genes_got.add(system_id)
def_path = find_model_definition(system, subsystem, list_paths)
in_exchangeables = False
current_gene = {}
exchangeables = []
with open(def_path["path"]) as file:
for event, elem in ET.iterparse(file, events=("start", "end")):
if event == "start":
if (
elem.tag == "gene"
and not in_exchangeables
and elem.attrib["presence"] != "forbidden"
):
current_gene = {
"system": system,
"subsystem": subsystem,
"gene": elem.attrib["name"],
"version": def_path["version"],
"exchangeables": None,
}
system_genes.append(current_gene)
if elem.tag == "gene" and in_exchangeables:
exchangeables.append(elem.attrib["name"])
if elem.tag == "exchangeables":
in_exchangeables = True
exchangeables = []
elif event == "end":
if elem.tag == "exchangeables":
in_exchangeables = False
current_gene["exchangeables"] = ",".join(exchangeables)
exchangeables = []
with open(output, "w") as f:
fieldnames = ["id", "system", "subsystem", "version", "gene", "exchangeables"]
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
for id, gene in enumerate(system_genes):
# gene["alternatives"] = ",".join(gene["alternatives"])
gene["id"] = id
writer.writerow(gene)
def download_model_release(releases):
for version, tag in releases:
df_model_url = f"https://github.com/mdmparis/defense-finder-models/releases/download/{tag}/defense-finder-models-{version}.tar.gz"
_, tmp_path = tempfile.mkstemp()
tmp_root_dir = tempfile.gettempdir()
df_model_dir = Path(f"{tmp_root_dir}/defense-finder-models-{version}")
df_model_definitions_dir = (
df_model_dir / "defense-finder-models" / "definitions"
)
console.print(f"Download models: {df_model_url}")
response = requests.get(
df_model_url,
allow_redirects=True,
)
with open(tmp_path, mode="wb") as file:
file.write(response.content)
console.print("untar file")
with tarfile.open(tmp_path) as archive:
archive.extractall(df_model_dir)
yield {"version": tag, "dir": df_model_definitions_dir}
TMP_CIF = """
#
loop_
_ma_qa_metric.id
_ma_qa_metric.mode
_ma_qa_metric.name
_ma_qa_metric.software_group_id
_ma_qa_metric.type
1 global pLDDT 1 pLDDT
2 local pLDDT 1 pLDDT
#
_ma_qa_metric_global.metric_id 1
_ma_qa_metric_global.metric_value {:.06}
_ma_qa_metric_global.model_id 1
_ma_qa_metric_global.ordinal_id 1
#
loop_
_ma_qa_metric_local.label_asym_id
_ma_qa_metric_local.label_comp_id
_ma_qa_metric_local.label_seq_id
_ma_qa_metric_local.metric_id
_ma_qa_metric_local.metric_value
_ma_qa_metric_local.model_id
_ma_qa_metric_local.ordinal_id
"""
def pdb2cif(pdb):
cif = str(pdb).split(".pdb")[0] + ".cif"
console.print(f"convert {pdb} -> {cif}")
p = PDBParser()
struc = p.get_structure("", pdb)
list_atoms = []
for a in struc.get_atoms():
list_atoms.append(
[
a.parent.parent.id,
a.parent.resname,
a.parent.id[1],
"2",
a.bfactor,
1,
a.parent.id[1],
]
)
df = pd.DataFrame(list_atoms).drop_duplicates()
with open(cif, "w") as of:
of.write(TMP_CIF.format(df[4].mean()))
df.to_csv(cif, index=False, header=False, mode="a", sep=" ")
io = MMCIFIO()
io.set_structure(struc)
with open(cif, "a") as of:
io.save(of)
def pae2png(tsv_file, png_file):
console.print(f"Convert : {tsv_file} row data to {png_file}")
v = pd.read_table(tsv_file, index_col=0, low_memory=False)
fig, ax = plt.subplots(1, 1, figsize=(4, 3), facecolor=None)
m = ax.matshow(v, cmap="Greens_r", vmin=0, vmax=35, aspect="auto", origin="lower")
cbar = plt.colorbar(m, ax=ax, fraction=0.046, pad=0.04)
ax.set_xlabel("Scored Residues")
ax.set_ylabel("Aligned Residues")
cbar.set_label("Expected Position Error (Å)")
plt.tight_layout()
plt.savefig(png_file, dpi=150, facecolor=None, transparent=True)
plt.close()
@app.command(help="Remove version from sys_id")
def refseq(
input: Annotated[
Path,
typer.Option(
exists=False,
file_okay=True,
writable=True,
),
],
output: Annotated[
Path,
typer.Option(
file_okay=True,
dir_okay=False,
writable=True,
resolve_path=True,
),
],
):
with open(output, "w") as out, open(input, "r") as refseq_f:
reader = csv.DictReader(refseq_f)
fieldnames = reader.fieldnames
writer = csv.DictWriter(out, fieldnames=fieldnames)
writer.writeheader()
for row in reader:
if row["sys_id"] == "":
row["sys_id"] = f'{row["Assembly"]}_{row["replicon"]}'
result = re.sub(r"^(\w+)\.\d+(_.*)$", r"\1\2", row["sys_id"])
console.print(f"[green]{row['sys_id']} -> {result}")
row["sys_id"] = result
writer.writerow(row)
@app.command(
help='Remove "No system found" hits if the are not the only hit for an assembly'
)
def refseq_sanitized_hits(
input: Annotated[
Path,
typer.Option(
exists=False,
file_okay=True,
writable=True,
),
],
output: Annotated[
Path,
typer.Option(
file_okay=True,
dir_okay=False,
writable=True,
resolve_path=True,
),
],
):
df = pd.read_csv(input)
df_final = _sanitized_refseq_hits(df)
df_final.reset_index().to_csv(output, index=False)
return df_final
@app.command(help="Group hits per assembly and types (from 'sanitized-hits')")
def refseq_group_per_assembly_and_type(
input: Annotated[
Path,
typer.Option(
exists=False,
file_okay=True,
writable=True,
),
],
output: Annotated[
Path,
typer.Option(
file_okay=True,
dir_okay=False,
writable=True,
resolve_path=True,
),
],
):
df = pd.read_csv(input)
df_final = _sanitized_refseq_hits(df)
df_final_grouped = df_final.groupby(
[
"Assembly",
"type",
"Superkingdom",
"phylum",
"class",
"order",
"family",
"genus",
"species",
],
as_index=False,
dropna=False,
).size()
df_final_grouped.reset_index().to_csv(output, index=False)
@app.command()
def refseq_group_per_assembly(
input: Annotated[
Path,
typer.Option(
exists=False,
file_okay=True,
writable=True,
),
],
output: Annotated[
Path,
typer.Option(
file_okay=True,
dir_okay=False,
writable=True,
resolve_path=True,
),
],
):
df = pd.read_csv(input)
df["Assembly"] = df["Assembly"].apply(remove_version)
df_grouped = df.groupby(
[
"Assembly",
"Superkingdom",
"phylum",
"class",
"order",
"family",
"genus",
"species",
],
as_index=False,
dropna=False,
).size()
df_grouped.reset_index().to_csv(output, index=False)
@app.command()
def refseq_type_count(
input: Annotated[
Path,
typer.Option(
exists=False,
file_okay=True,
writable=True,
help="csv file with type and taxo (No system found removed when other system are founded in the same assembly)",
),
],
output: Annotated[
Path,
typer.Option(
file_okay=True,
dir_okay=False,
writable=True,
resolve_path=True,
),
],
):
df = pd.read_csv(input)
grouped_per_type = df.groupby(["type"], as_index=False, dropna=False).size()
grouped_per_type.reset_index().to_csv(output, index=False)
@app.command()
def markdown(
dir: Annotated[
Path,
typer.Option(
exists=True,
file_okay=False,
writable=True,
readable=True,
resolve_path=True,
help="Dir where all systems article are",
),
],
):
for file in dir.iterdir():
if file.suffix == ".md":
console.rule(f"[bold blue]{file.name}", style="blue")
# make a copy of file
_, tmp_path = tempfile.mkstemp()
# with open(dst, "w") as tmp_f:
dst = Path(tmp_path)
dst.write_bytes(file.read_bytes())
# check if article has ## Structure, ## Experimental Validation, ## Distribution of the system among prokaryotes
with open(dst, "r+") as f:
all_file = f.read()
if (
re.search(
r"#{2}\s+Structure",
all_file,
flags=re.IGNORECASE | re.MULTILINE,
)
and re.search(
r"#{2}\s+Experimental\s+validation",
all_file,
flags=re.IGNORECASE | re.MULTILINE,
)
and re.search(
r"#{2}\s+Distribution\s+of\s+the\s+system\s+among\s+prokaryotes",
all_file,
flags=re.IGNORECASE | re.MULTILINE,
)
):
new_f_str = re.sub(
r"#{2}\s+Structure.*?#{2}\s+Experimental\s+validation",
"## Structure\n\n::article-structure\n::\n\n## Experimental validation",
all_file,
flags=re.DOTALL | re.IGNORECASE,
)
new_f = re.sub(
r"#{2}\s+Distribution\s+of\s+the\s+system\s+among\s+prokaryotes.*?#{2}\s+Structure",
"## Distribution of the system among prokaryotes\n\n::article-system-distribution-plot\n::\n\n## Structure",
new_f_str,
flags=re.DOTALL | re.IGNORECASE,
)
with open(file, "w") as f_out:
f_out.write(new_f)
else:
console.log(f"[bold red]check it manually")
def remove_version(assembly):
return assembly.split(".")[0]
def _sanitized_refseq_hits(df):
df["Assembly"] = df["Assembly"].apply(remove_version)
# Lower type namesmc
# df["type"] = df["type"].apply(lambda x: x.lower())
# Get all row with no system type
df_no_system = df.loc[df["type"] == "No system found"]
# unique assembly with no sys
serie_assembly_with_no_sys = df_no_system["Assembly"].unique()
# filter assembly to have those with no sys
df_with_no_sys = df[df["Assembly"].isin(serie_assembly_with_no_sys)]
# Group them by assembly, type, taxo
no_sys_assembly_by_size = df_with_no_sys.groupby(
[
"Assembly",
"type",
"Superkingdom",
"phylum",
"class",
"order",
"family",
"genus",
"species",
],
as_index=False,
dropna=False,
).size()
# count each occurrence
df_again_per_assembly = no_sys_assembly_by_size.groupby(
"Assembly", as_index=False, dropna=False
).size()
# filter to keep only size > 1 (when == 1 it means that there is only "no system found for an assembly")
# so we should keep it
df_size_sup_1 = df_again_per_assembly[df_again_per_assembly["size"] > 1]
assembly_where_should_remove_no_sys_found = df_size_sup_1["Assembly"].unique()
# Construct new dataset to remove entries with no system found
# while found system on other replicon that belongs to the
# same assembly
df_filtered_assembly_only_with_sys = df[
(df["type"] != "No system found")
| ~df.Assembly.isin(assembly_where_should_remove_no_sys_found)
]
return df_filtered_assembly_only_with_sys
def find_model_definition(system, subsystem, list_paths):
found_path = None
for p in list_paths:
path = p["path"]
parts = path.parts
if path.stem == subsystem and parts[-2] == system:
console.rule(f"{system} - {subsystem}")
console.print(p)
found_path = {"path": path, "version": p["version"]}
break
if found_path is None:
raise FileNotFoundError
else:
return found_path
def gen_model_path(model_dirs):
for model_dir in model_dirs:
for subdir in model_dir["dir"].iterdir():
for system_path in subdir.iterdir():
for subsystem_path in system_path.iterdir():
if str(subsystem_path).endswith(".xml"):
yield {"path": subsystem_path, "version": model_dir["version"]}