Commit cc574d80 authored by Yoann Dufresne's avatar Yoann Dufresne
Browse files

generating and ordering d_graphs

parent aa076854
import networkx as nx
import math
from functools import total_ordering
@total_ordering
class Dgraph(object):
"""docstring for Dgraph"""
def __init__(self):
super(Dgraph, self).__init__()
self.score = 0
self.halves = [[],[]]
""" Compute the d-graph quality (score) according to the connectivity between the two halves.
@param h1 First half of the d-graph
@param h2 Second half of the d-graph
@param graph The connectivity graph
"""
def put_halves(self, h1, h2, graph):
self.score = 0
self.halves[0] = h1
self.halves[1] = h2
# Compute link arities
for node1 in h1:
neighbors = set(graph.neighbors(node1))
for node2 in h2:
if node1 == node2 or node2 in neighbors:
self.score += 1
def get_link_ratio(self):
return abs((self.score / self.get_optimal_score()) - 1)
def get_optimal_score(self):
max_len = max(len(self.halves[0]), len(self.halves[1]))
return max_len * (max_len - 1) / 2
def __eq__(self, other):
my_tuple = (self.get_link_ratio(), self.get_optimal_score())
other_tuple = (other.get_link_ratio(), other.get_optimal_score())
return (my_tuple == other_tuple)
def __ne__(self, other):
return not (self == other)
def __lt__(self, other):
my_tuple = (self.get_link_ratio(), self.get_optimal_score())
other_tuple = (other.get_link_ratio(), other.get_optimal_score())
return (my_tuple < other_tuple)
def __repr__(self):
return str(self.score) + "/" + str(self.get_optimal_score()) + " " + str(self.halves[0]) + str(self.halves[1])
""" From a barcode graph, compute all the possible max d-graphs by node.
@param graph A barcode graph
@param n_best Only keep n d-graphs (the nearest to 1.0 ratio)
@return A dictionary associating each node to its list of all possible d-graphs. The d-graphs are sorted by decreasing ratio.
"""
def compute_all_max_d_graphs(graph, n_best=10, max_overlap=2):
d_graphes = {}
for node in list(graph.nodes()):
neighbors = list(graph.neighbors(node))
neighbors_graph = nx.Graph(graph.subgraph(neighbors))
node_d_graphes = []
# Find all the cliques (equivalent to compute all the candidate half d-graph)
cliques = list(nx.find_cliques(neighbors_graph))
# Pair halves to create d-graphes
for idx, clq1 in enumerate(cliques):
for clq2_idx in range(idx+1, len(cliques)):
clq2 = cliques[clq2_idx]
# Check for d-graph candidates
d_graph = Dgraph()
d_graph.put_halves(clq1, clq2, neighbors_graph)
optimal_score = d_graph.get_optimal_score()
# For a minimal connection To avoid too much shared nodes
if d_graph.score < optimal_score / 2 or d_graph.score >= 1.5 * optimal_score:
continue
node_d_graphes.append(d_graph)
# Cut the the distribution queue
d_graphes[node] = sorted(node_d_graphes)
print(node_d_graphes)
return d_graphes
......@@ -248,6 +248,8 @@ def filter_d_graphs(candidates, max_overlap=0):
return filtered, unpartitionned
import d_graph as dg
def main():
# Parsing the input file
filename = sys.argv[1]
......@@ -256,16 +258,21 @@ def main():
G = nx.read_graphml(filename)
elif filename.endswith('.gexf'):
G = nx.read_gexf(filename)
dgraphs = dg.compute_all_max_d_graphs(G)
for node in list(G.nodes())[:10]:
print(node, dgraphs[node])
print("...")
# Deconvolve
g_nodes = list(G.nodes())
for node in g_nodes:
local_deconvolve(G,node, verbose=2 if (node.startswith("0:")) else 0)
# exit()
# # Deconvolve
# g_nodes = list(G.nodes())
# for node in g_nodes:
# local_deconvolve(G,node, verbose=2 if (node.startswith("0:")) else 0)
# # exit()
print(len(g_nodes), "->", len(list(G.nodes())))
# print(len(g_nodes), "->", len(list(G.nodes())))
nx.write_graphml(G,sys.argv[1]+".deconvolved.graphml")
# nx.write_graphml(G,sys.argv[1]+".deconvolved.graphml")
if __name__ == "__main__":
main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment