From d27838a25e113c322d1b076428dd85e4dbecd73f Mon Sep 17 00:00:00 2001
From: Yoann Dufresne <yoann.dufresne0@gmail.com>
Date: Wed, 24 Apr 2019 12:14:49 +0200
Subject: [PATCH] add tests and correct the values from them

---
 deconvolution/d2_graph.py   | 33 +++++++++++++++++--
 deconvolution/d_graph.py    | 30 +++++++++++-------
 deconvolution/deconvolve.py |  2 +-
 tests/__init__.py           |  0
 tests/d2_graph_test.py      | 41 ++++++++++++++++++++++++
 tests/d_graph_data.py       | 63 +++++++++++++++++++++++++++++++++++++
 tests/d_graph_test.py       | 48 ++++++++++++++++++++++++++++
 7 files changed, 202 insertions(+), 15 deletions(-)
 create mode 100644 tests/__init__.py
 create mode 100644 tests/d2_graph_test.py
 create mode 100644 tests/d_graph_data.py
 create mode 100644 tests/d_graph_test.py

diff --git a/deconvolution/d2_graph.py b/deconvolution/d2_graph.py
index ba67b39..46c6886 100644
--- a/deconvolution/d2_graph.py
+++ b/deconvolution/d2_graph.py
@@ -1,5 +1,7 @@
 import networkx as nx
-import d_graph as dgm
+import itertools
+
+from d_graph import compute_all_max_d_graphs
 
 
 class D2Graph(object):
@@ -9,7 +11,7 @@ class D2Graph(object):
         self.graph = graph
 
         # Compute all the d-graphs
-        self.d_graphs = dgm.compute_all_max_d_graphs(self.graph)
+        self.d_graphs = compute_all_max_d_graphs(self.graph)
         
         # Index all the d-graphes
         self.index = self.create_index()
@@ -17,6 +19,33 @@ class D2Graph(object):
 
     def create_index(self):
         index = {}
+
+        perfect = 0
+        for node in self.d_graphs:
+            for dg in self.d_graphs[node]:
+                lst = dg.to_ordered_lists()
+                # Generate all dmers without the first node
+                # pull all the values 
+                concat = [el for l in lst[1:] for el in l]
+                # generate dmers
+                for idx in range(len(lst[0])):
+                    dmer = frozenset(concat + lst[0][:idx] + lst[0][idx+1:])
+                    if not dmer in index:
+                        index[dmer] = [dg]
+                    else:
+                        index[dmer].append(dg)
+
+                # Generate all dmers without the last node
+                # pull all the values 
+                concat = [el for l in lst[:-1] for el in l]
+                # generate dmers
+                for idx in range(len(lst[-1])):
+                    dmer = frozenset(concat + lst[-1][:idx] + lst[-1][idx+1:])
+                    if not dmer in index:
+                        index[dmer] = [dg]
+                    else:
+                        index[dmer].append(dg)
+
         return index
 
 
diff --git a/deconvolution/d_graph.py b/deconvolution/d_graph.py
index 4ce1185..cffa79b 100644
--- a/deconvolution/d_graph.py
+++ b/deconvolution/d_graph.py
@@ -42,7 +42,7 @@ class Dgraph(object):
         self.halves[1].sort(reverse=True, key=lambda v: connex[1][v])
 
 
-    def get_link_ratio(self):
+    def get_link_divergence(self):
         return abs((self.score / self.get_optimal_score()) - 1)
 
 
@@ -51,29 +51,35 @@ class Dgraph(object):
         return max_len * (max_len - 1) / 2
 
 
-    def to_ordered_list(self):
-        # TODO : Can't be uniq (see for corrections)
-        return self.halves[0][::-1] + [self.center] + self.halves[1]
+    def to_ordered_lists(self):
+        hands = [[],[]]
+        for idx in range(2):
+            prev_connectivity = -1
+            for node in self.halves[idx]:
+                # group nodes by similar connectivity
+                value = self.connexity[idx][node]
+                if value != prev_connectivity:
+                    hands[idx].append([])
+                    prev_connectivity = value
+                hands[idx][-1].append(node)
+
+        return hands[0][::-1] + [[self.center]] + hands[1]
 
 
     def __eq__(self, other):
-        my_tuple = (self.get_link_ratio(), self.get_optimal_score())
-        other_tuple = (other.get_link_ratio(), other.get_optimal_score())
+        my_tuple = (self.get_link_divergence(), self.get_optimal_score())
+        other_tuple = (other.get_link_divergence(), other.get_optimal_score())
         return (my_tuple == other_tuple)
 
     def __ne__(self, other):
         return not (self == other)
 
     def __lt__(self, other):
-        my_tuple = (self.get_link_ratio(), self.get_optimal_score())
-        other_tuple = (other.get_link_ratio(), other.get_optimal_score())
+        my_tuple = (self.get_link_divergence(), self.get_optimal_score())
+        other_tuple = (other.get_link_divergence(), other.get_optimal_score())
         return (my_tuple < other_tuple)
 
 
-    def __hash__(self):
-        return frozenset(self.to_ordered_list()).__hash__()
-
-
     def __repr__(self):
         # print(self.halves)
         representation = self.center + " " + str(self.score) + "/" + str(self.get_optimal_score()) + " "
diff --git a/deconvolution/deconvolve.py b/deconvolution/deconvolve.py
index d6cba26..bcf1e86 100755
--- a/deconvolution/deconvolve.py
+++ b/deconvolution/deconvolve.py
@@ -20,7 +20,7 @@ def main():
         G = nx.read_gexf(filename)
 
     d2g = d2.D2Graph(G)
-    d2g.save_to_file("data/d2_graph.gexf")
+    # d2g.save_to_file("data/d2_graph.gexf")
 
 if __name__ == "__main__":
     main()
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
diff --git a/tests/d2_graph_test.py b/tests/d2_graph_test.py
new file mode 100644
index 0000000..961aa8c
--- /dev/null
+++ b/tests/d2_graph_test.py
@@ -0,0 +1,41 @@
+import unittest
+
+from d2_graph import D2Graph
+from d_graph import Dgraph
+
+from tests.d_graph_data import unit_d_graph, unit_overlapp_d_graph, complete_graph
+
+
+class TestD2Graph(unittest.TestCase):
+    def test_construction(self):
+        d2 = D2Graph(complete_graph)
+
+        # Evaluate the number of candidate unit d_graphs generated
+        for node, candidates in d2.d_graphs.items():
+            if node == "C" or node == "B2":
+                self.assertEquals(1, len(candidates))
+            else:
+                self.assertEquals(0, len(candidates))
+
+        # Evaluate the hashes
+        self.assertEquals(3, len(d2.index))
+
+        udg = Dgraph(unit_d_graph[0])
+        udg.put_halves(unit_d_graph[1], unit_d_graph[2], unit_d_graph[3])
+        uodg = Dgraph(unit_overlapp_d_graph[0])
+        uodg.put_halves(unit_overlapp_d_graph[1], unit_overlapp_d_graph[2], unit_overlapp_d_graph[3])
+
+        key = frozenset({'A2', 'A1', 'B1', 'C', 'B0', 'B2'})
+        self.assertEquals(2, len(d2.index[key]))
+        self.assertTrue(udg in d2.index[key])
+        self.assertTrue(uodg in d2.index[key])
+        key = frozenset({'A0', 'A2', 'A1', 'B1', 'C', 'B2'})
+        self.assertEquals(1, len(d2.index[key]))
+        self.assertEquals(udg, d2.index[key][0])
+        key = frozenset({'A2', 'B-1', 'B1', 'C', 'B2', 'B0'})
+        self.assertEquals(1, len(d2.index[key]))
+        self.assertEquals(uodg, d2.index[key][0])
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/tests/d_graph_data.py b/tests/d_graph_data.py
new file mode 100644
index 0000000..5acdbb2
--- /dev/null
+++ b/tests/d_graph_data.py
@@ -0,0 +1,63 @@
+import networkx as nx
+
+# Describe a unit d-graph with d = 3 centered on the C node
+
+__h1 = ["A0", "A1", "A2"]
+__h2 = ["B0", "B1", "B2"]
+# nodes
+__G = nx.Graph()
+for node in __h1 + __h2:
+    __G.add_node(node)
+# left edges
+__G.add_edge("A0", "A1")
+__G.add_edge("A2", "A1")
+__G.add_edge("A0", "A2")
+# right edges
+__G.add_edge("B0", "B1")
+__G.add_edge("B2", "B1")
+__G.add_edge("B0", "B2")
+# Transitive edges
+__G.add_edge("A1", "B2")
+__G.add_edge("A2", "B1")
+__G.add_edge("A2", "B2")
+
+
+unit_d_graph = ("C", __h1, __h2, __G)
+
+
+
+# Describe a unit d-graph with d = 3 centered on the C node
+
+__h1 = ["A1", "A2", "C"]
+__h2 = ["B0", "B1", "B-1"]
+# nodes
+__G = nx.Graph()
+for node in __h1 + __h2:
+    __G.add_node(node)
+# left edges
+__G.add_edge("C", "A1")
+__G.add_edge("A2", "A1")
+__G.add_edge("C", "A2")
+# right edges
+__G.add_edge("B0", "B1")
+__G.add_edge("B-1", "B1")
+__G.add_edge("B0", "B-1")
+# Transitive edges
+__G.add_edge("A2", "B1")
+__G.add_edge("C", "B1")
+__G.add_edge("C", "B0")
+
+
+unit_overlapp_d_graph = ("B2", __h1, __h2, __G)
+
+
+
+nodes = ["A0", "A1", "A2", "C", "B2", "B1", "B0", "B-1"]
+# nodes
+complete_graph = nx.Graph()
+for node in nodes:
+    complete_graph.add_node(node)
+
+for i in range(1,4):
+    for idx in range(i, len(nodes)):
+        complete_graph.add_edge(nodes[idx-i], nodes[idx])
diff --git a/tests/d_graph_test.py b/tests/d_graph_test.py
new file mode 100644
index 0000000..0e19875
--- /dev/null
+++ b/tests/d_graph_test.py
@@ -0,0 +1,48 @@
+import unittest
+
+from tests.d_graph_data import unit_d_graph
+from d_graph import Dgraph
+
+
+
+class TestDGraph(unittest.TestCase):
+
+    def test_construction(self):
+        center, h1, h2, G = unit_d_graph
+        # Test basic construction
+        dg = Dgraph(center)
+        self.assertEquals(center, dg.center)
+        self.assertEquals(0, dg.score)
+
+        # Test adding d-graph sides
+        dg.put_halves(h1, h2, G)
+        self.assertEquals(set(h1), set(dg.halves[0]))
+        self.assertEquals(set(h2), set(dg.halves[1]))
+        self.assertEquals(dg.connexity[0], {"A0":0,"A1":1,"A2":2})
+        self.assertEquals(dg.connexity[1], {"B0":0,"B1":1,"B2":2})
+
+    def test_optimal_score(self):
+        center, h1, h2, G = unit_d_graph
+        dg = Dgraph(center)
+        dg.put_halves(h1, h2, G)
+        # Must be the number of transitive edges
+        self.assertEquals(3, dg.get_optimal_score())
+
+    def test_divergence(self):
+        center, h1, h2, G = unit_d_graph
+        dg = Dgraph(center)
+        dg.put_halves(h1, h2, G)
+        self.assertEquals(0.0, dg.get_link_divergence())
+
+    def test_dg_to_list(self):
+        center, h1, h2, G = unit_d_graph
+        dg = Dgraph(center)
+        dg.put_halves(h1, h2, G)
+        lst = dg.to_ordered_lists()
+
+        self.assertEquals([['A0'], ['A1'], ['A2'], ['C'], ['B2'], ['B1'], ['B0']], lst)
+
+
+
+if __name__ == "__main__":
+    unittest.main()
-- 
GitLab