#!/usr/bin/env python3
take a D2 graph generated from a graph where we know the molecule positions on the genome
compute quality metrics on this d2 graph
import sys
import argparse
from termcolor import colored
import networkx as nx
def parse_args():
parser = argparse.ArgumentParser(description='Process some integers.')
parser.add_argument('filename', type=str,
help='The file to evalute')
args = parser.parse_args()
return args
def load_graph(filename):
if filename.endswith('.graphml'):
return nx.read_graphml(filename)
elif filename.endswith('.gexf'):
return nx.read_gexf(filename)
print("Wrong file format. Require graphml or gefx format", file=sys.stderr)
""" return a random path in G starting in u and having n _edges_ """
import random
def findRandomPath(G,u,n,previous_path_nodes=set()):
if n==0:
return [u]
poss_neigh = list(set(G.neighbors(u)) - previous_path_nodes)
if len(poss_neigh) == 0: return None
neighbor = random.choice(poss_neigh)
new_previous_path_nodes = previous_path_nodes | set([u])
path = findRandomPath(G,neighbor,n-1, new_previous_path_nodes)
if path is None: return None
if len(path) != n: return None
return [u]+path
import itertools
""" determine, given an ordered list of central nodes, whether there exists a coherent paths of overlapping molecules in them """
def is_there_path_acc(mols_in_central_nodes,min_overlap_length=5000): # don't consider overlaps smaller than 5kbp
for mols in itertools.product(*mols_in_central_nodes):
last_end = None
last_start = None
good_path = True
for mol in sorted(mols):
start,end = mol
if last_end is None:
last_end = end
if last_start is None:
last_start = start
if not (start >= last_start and start <= last_end-min_overlap_length):
#print("bad path",mols)
good_path = False
last_end = end
last_start = start
if good_path:
return True
return False
""" converts a central node of a d-graph into its list of molecules (given the ground truth) """
def central_node_to_molecules(nodestr):
# format for a 2-merge: 1:NC_000913.3_298281_313280_0:0:0_0:0:0_2fb/1_NC_000913.3_338611_353610_0:0:0_0:0:0_37b/1
cur_node_mols = []
for ide in nodestr.split('NC_')[1:]: # specific to e coli
x = ide.split("_")
start, end = int(x[1]), int(x[2])
cur_node_mols += [(start,end)]
return cur_node_mols
def is_coherent_path(central_nodes,path_len):
mols_in_central_nodes = []
for node in central_nodes:
cur_node_mols = central_node_to_molecules(node)
mols_in_central_nodes += [cur_node_mols]
assert(len(mols_in_central_nodes) == path_len+1)
return is_there_path_acc(mols_in_central_nodes)
""" the main function that tests for accuracy of random paths """
graph = None
def evaluate_accuracy_paths(path_len,max_paths_per_node=100):
global graph
nb_bad_paths = 0
nb_good_paths = 0
for node in graph.nodes():
nb_paths = 0
seen_paths = set()
for _ in range(max_paths_per_node):
path = findRandomPath(graph,node,path_len)
if path is None: continue
if tuple(sorted(path)) in seen_paths: continue # avoids looking at the same path twice
assert(len(path) == path_len+1)
central_nodes = [graph.nodes[x]['udg'].split()[0] for x in path]
assert(len(central_nodes) == path_len+1)
if is_coherent_path(central_nodes,path_len):
nb_good_paths += 1
nb_bad_paths += 1
print("accuracy for l=%d:" % path_len,nb_good_paths / (nb_good_paths + nb_bad_paths))
# ---- sensitivity evaluation
""" given an ordered list of molecules, determine if the graph contains a path of central nodse which have these molecules.
it does that by testing all possible combinations of d-graphs having those molecules in their central nodes """
def is_there_path(graph,molecules_to_nodes,sought_path):
possible_central_nodes = []
for mol in sought_path:
possible_central_nodes += [molecules_to_nodes[mol]]
for mols in itertools.product(*possible_central_nodes):
if nx.is_connected(graph.subgraph(mols)):
#print("found connected path",mols)
return True
#print("found no connected paths",sought_path)
return False
def evaluate_sensitivity_paths(path_len,overlap_length=7000):
all_molecules = set()
from collections import defaultdict
molecules_to_nodes = defaultdict(list)
for node in graph.nodes():
central_node = graph.nodes[node]['udg'].split()[0]
mols = central_node_to_molecules(central_node)
all_molecules |= set(mols)
for mol in mols:
molecules_to_nodes[mol] += [node]
all_molecules = sorted(all_molecules)
print("got",len(all_molecules),"molecules total")
nb_found, nb_not_found = 0,0
for i in range(len(all_molecules)-path_len+1):
sought_path = all_molecules[i:i+path_len]
if is_there_path(graph,molecules_to_nodes,sought_path):
nb_found += 1
nb_not_found += 1
print("sensitivity for l=%d" % path_len, nb_found / (nb_not_found + nb_found))
from multiprocessing import Pool
def main():
global graph
args = parse_args()
graph = load_graph(args.filename)
p = Pool(4), [1,2,3,4]), [1,2,3,4])
if __name__ == "__main__":
