diff --git a/deconvolution/d2_graph.py b/deconvolution/d2_graph.py index ba67b395f7346dca835f97dc70a0e906f9658c72..46c68862fef82c9570416f393c0fe46e153cb1aa 100644 --- a/deconvolution/d2_graph.py +++ b/deconvolution/d2_graph.py @@ -1,5 +1,7 @@ import networkx as nx -import d_graph as dgm +import itertools + +from d_graph import compute_all_max_d_graphs class D2Graph(object): @@ -9,7 +11,7 @@ class D2Graph(object): self.graph = graph # Compute all the d-graphs - self.d_graphs = dgm.compute_all_max_d_graphs(self.graph) + self.d_graphs = compute_all_max_d_graphs(self.graph) # Index all the d-graphes self.index = self.create_index() @@ -17,6 +19,33 @@ class D2Graph(object): def create_index(self): index = {} + + perfect = 0 + for node in self.d_graphs: + for dg in self.d_graphs[node]: + lst = dg.to_ordered_lists() + # Generate all dmers without the first node + # pull all the values + concat = [el for l in lst[1:] for el in l] + # generate dmers + for idx in range(len(lst[0])): + dmer = frozenset(concat + lst[0][:idx] + lst[0][idx+1:]) + if not dmer in index: + index[dmer] = [dg] + else: + index[dmer].append(dg) + + # Generate all dmers without the last node + # pull all the values + concat = [el for l in lst[:-1] for el in l] + # generate dmers + for idx in range(len(lst[-1])): + dmer = frozenset(concat + lst[-1][:idx] + lst[-1][idx+1:]) + if not dmer in index: + index[dmer] = [dg] + else: + index[dmer].append(dg) + return index diff --git a/deconvolution/d_graph.py b/deconvolution/d_graph.py index 4ce1185b3e5ca9815a3aa82ed21f2e1705fcdc33..cffa79b28bc56ed0a70b2d330218dcfb1e05a822 100644 --- a/deconvolution/d_graph.py +++ b/deconvolution/d_graph.py @@ -42,7 +42,7 @@ class Dgraph(object): self.halves[1].sort(reverse=True, key=lambda v: connex[1][v]) - def get_link_ratio(self): + def get_link_divergence(self): return abs((self.score / self.get_optimal_score()) - 1) @@ -51,29 +51,35 @@ class Dgraph(object): return max_len * (max_len - 1) / 2 - def to_ordered_list(self): - # TODO : Can't be uniq (see for corrections) - return self.halves[0][::-1] + [self.center] + self.halves[1] + def to_ordered_lists(self): + hands = [[],[]] + for idx in range(2): + prev_connectivity = -1 + for node in self.halves[idx]: + # group nodes by similar connectivity + value = self.connexity[idx][node] + if value != prev_connectivity: + hands[idx].append([]) + prev_connectivity = value + hands[idx][-1].append(node) + + return hands[0][::-1] + [[self.center]] + hands[1] def __eq__(self, other): - my_tuple = (self.get_link_ratio(), self.get_optimal_score()) - other_tuple = (other.get_link_ratio(), other.get_optimal_score()) + my_tuple = (self.get_link_divergence(), self.get_optimal_score()) + other_tuple = (other.get_link_divergence(), other.get_optimal_score()) return (my_tuple == other_tuple) def __ne__(self, other): return not (self == other) def __lt__(self, other): - my_tuple = (self.get_link_ratio(), self.get_optimal_score()) - other_tuple = (other.get_link_ratio(), other.get_optimal_score()) + my_tuple = (self.get_link_divergence(), self.get_optimal_score()) + other_tuple = (other.get_link_divergence(), other.get_optimal_score()) return (my_tuple < other_tuple) - def __hash__(self): - return frozenset(self.to_ordered_list()).__hash__() - - def __repr__(self): # print(self.halves) representation = self.center + " " + str(self.score) + "/" + str(self.get_optimal_score()) + " " diff --git a/deconvolution/deconvolve.py b/deconvolution/deconvolve.py index d6cba263184932f812232b80ffdc6e3427c5ff6a..bcf1e86f9443fe58777090dc4cc504c48bc36826 100755 --- a/deconvolution/deconvolve.py +++ b/deconvolution/deconvolve.py @@ -20,7 +20,7 @@ def main(): G = nx.read_gexf(filename) d2g = d2.D2Graph(G) - d2g.save_to_file("data/d2_graph.gexf") + # d2g.save_to_file("data/d2_graph.gexf") if __name__ == "__main__": main() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..e69de29bb2d1d6434b8b29ae775ad8c2e48c5391 diff --git a/tests/d2_graph_test.py b/tests/d2_graph_test.py new file mode 100644 index 0000000000000000000000000000000000000000..961aa8c3310a2052a7035b4cbd189d70299cc086 --- /dev/null +++ b/tests/d2_graph_test.py @@ -0,0 +1,41 @@ +import unittest + +from d2_graph import D2Graph +from d_graph import Dgraph + +from tests.d_graph_data import unit_d_graph, unit_overlapp_d_graph, complete_graph + + +class TestD2Graph(unittest.TestCase): + def test_construction(self): + d2 = D2Graph(complete_graph) + + # Evaluate the number of candidate unit d_graphs generated + for node, candidates in d2.d_graphs.items(): + if node == "C" or node == "B2": + self.assertEquals(1, len(candidates)) + else: + self.assertEquals(0, len(candidates)) + + # Evaluate the hashes + self.assertEquals(3, len(d2.index)) + + udg = Dgraph(unit_d_graph[0]) + udg.put_halves(unit_d_graph[1], unit_d_graph[2], unit_d_graph[3]) + uodg = Dgraph(unit_overlapp_d_graph[0]) + uodg.put_halves(unit_overlapp_d_graph[1], unit_overlapp_d_graph[2], unit_overlapp_d_graph[3]) + + key = frozenset({'A2', 'A1', 'B1', 'C', 'B0', 'B2'}) + self.assertEquals(2, len(d2.index[key])) + self.assertTrue(udg in d2.index[key]) + self.assertTrue(uodg in d2.index[key]) + key = frozenset({'A0', 'A2', 'A1', 'B1', 'C', 'B2'}) + self.assertEquals(1, len(d2.index[key])) + self.assertEquals(udg, d2.index[key][0]) + key = frozenset({'A2', 'B-1', 'B1', 'C', 'B2', 'B0'}) + self.assertEquals(1, len(d2.index[key])) + self.assertEquals(uodg, d2.index[key][0]) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/d_graph_data.py b/tests/d_graph_data.py new file mode 100644 index 0000000000000000000000000000000000000000..5acdbb2c1ab233b36a41af66baea5df7faf3d9ea --- /dev/null +++ b/tests/d_graph_data.py @@ -0,0 +1,63 @@ +import networkx as nx + +# Describe a unit d-graph with d = 3 centered on the C node + +__h1 = ["A0", "A1", "A2"] +__h2 = ["B0", "B1", "B2"] +# nodes +__G = nx.Graph() +for node in __h1 + __h2: + __G.add_node(node) +# left edges +__G.add_edge("A0", "A1") +__G.add_edge("A2", "A1") +__G.add_edge("A0", "A2") +# right edges +__G.add_edge("B0", "B1") +__G.add_edge("B2", "B1") +__G.add_edge("B0", "B2") +# Transitive edges +__G.add_edge("A1", "B2") +__G.add_edge("A2", "B1") +__G.add_edge("A2", "B2") + + +unit_d_graph = ("C", __h1, __h2, __G) + + + +# Describe a unit d-graph with d = 3 centered on the C node + +__h1 = ["A1", "A2", "C"] +__h2 = ["B0", "B1", "B-1"] +# nodes +__G = nx.Graph() +for node in __h1 + __h2: + __G.add_node(node) +# left edges +__G.add_edge("C", "A1") +__G.add_edge("A2", "A1") +__G.add_edge("C", "A2") +# right edges +__G.add_edge("B0", "B1") +__G.add_edge("B-1", "B1") +__G.add_edge("B0", "B-1") +# Transitive edges +__G.add_edge("A2", "B1") +__G.add_edge("C", "B1") +__G.add_edge("C", "B0") + + +unit_overlapp_d_graph = ("B2", __h1, __h2, __G) + + + +nodes = ["A0", "A1", "A2", "C", "B2", "B1", "B0", "B-1"] +# nodes +complete_graph = nx.Graph() +for node in nodes: + complete_graph.add_node(node) + +for i in range(1,4): + for idx in range(i, len(nodes)): + complete_graph.add_edge(nodes[idx-i], nodes[idx]) diff --git a/tests/d_graph_test.py b/tests/d_graph_test.py new file mode 100644 index 0000000000000000000000000000000000000000..0e19875e6cd468a78a0bdc269a62e02e1566c238 --- /dev/null +++ b/tests/d_graph_test.py @@ -0,0 +1,48 @@ +import unittest + +from tests.d_graph_data import unit_d_graph +from d_graph import Dgraph + + + +class TestDGraph(unittest.TestCase): + + def test_construction(self): + center, h1, h2, G = unit_d_graph + # Test basic construction + dg = Dgraph(center) + self.assertEquals(center, dg.center) + self.assertEquals(0, dg.score) + + # Test adding d-graph sides + dg.put_halves(h1, h2, G) + self.assertEquals(set(h1), set(dg.halves[0])) + self.assertEquals(set(h2), set(dg.halves[1])) + self.assertEquals(dg.connexity[0], {"A0":0,"A1":1,"A2":2}) + self.assertEquals(dg.connexity[1], {"B0":0,"B1":1,"B2":2}) + + def test_optimal_score(self): + center, h1, h2, G = unit_d_graph + dg = Dgraph(center) + dg.put_halves(h1, h2, G) + # Must be the number of transitive edges + self.assertEquals(3, dg.get_optimal_score()) + + def test_divergence(self): + center, h1, h2, G = unit_d_graph + dg = Dgraph(center) + dg.put_halves(h1, h2, G) + self.assertEquals(0.0, dg.get_link_divergence()) + + def test_dg_to_list(self): + center, h1, h2, G = unit_d_graph + dg = Dgraph(center) + dg.put_halves(h1, h2, G) + lst = dg.to_ordered_lists() + + self.assertEquals([['A0'], ['A1'], ['A2'], ['C'], ['B2'], ['B1'], ['B0']], lst) + + + +if __name__ == "__main__": + unittest.main()