Commit 309388be authored by Yoann Dufresne's avatar Yoann Dufresne
Browse files

fix test issues

parent cf3ae74c
#!/usr/bin/env python3
import sys
print(sys.argv)
with open(sys.argv[1]) as file:
header = file.readline()
nb_nodes, nb_variables = [int(x) for x in header.split()]
present_variables = [False]*nb_variables
for _ in range(nb_nodes):
line = file.readline()
variables = [int(x) for x in line.split()[1:]]
for var in variables:
present_variables[var] = True
for idx, val in enumerate(present_variables):
if not val:
print(f"{idx} not present")
\ No newline at end of file
......@@ -7,14 +7,14 @@ from d_graph import compute_all_max_d_graphs
class D2Graph(object):
"""D2Graph (read it (d-graph)²)"""
def __init__(self, graph, index_size=8, verbose=True):
def __init__(self, graph, index_size=8, verbose=True, debug=False):
super(D2Graph, self).__init__()
self.graph = graph
# Compute all the d-graphs
if verbose:
print("Compute the unit d-graphs")
self.d_graphs_per_node = compute_all_max_d_graphs(self.graph)
self.d_graphs_per_node = compute_all_max_d_graphs(self.graph, debug=debug)
self.all_d_graphs = []
for d_graphs in self.d_graphs_per_node.values():
self.all_d_graphs.extend(d_graphs)
......@@ -30,6 +30,8 @@ class D2Graph(object):
for idx, edge in enumerate(self.graph.edges()):
if edge == (edge[1], edge[0]):
self.nb_uniq_edge += 1
if edge in self.edge_idxs:
print("Edge already present")
self.edge_idxs[edge] = idx
self.edge_idxs[(edge[1], edge[0])] = idx
......
......@@ -46,6 +46,19 @@ class Dgraph(object):
elif node2 < node1:
self.edges.append((node2, node1))
# Compute links from the center to the other nodes
for node in h1:
if node < self.center:
self.edges.append((node, self.center))
else:
self.edges.append((self.center, node))
for node in h2:
if node < self.center:
self.edges.append((node, self.center))
else:
self.edges.append((self.center, node))
# Sort the halves by descending connexity
connex = self.connexity
......@@ -150,7 +163,7 @@ class Dgraph(object):
@param n_best Only keep n d-graphs (the nearest to 1.0 ratio)
@return A dictionary associating each node to its list of all possible d-graphs. The d-graphs are sorted by decreasing ratio.
"""
def compute_all_max_d_graphs(graph, n_best=10):
def compute_all_max_d_graphs(graph, n_best=100, debug=False):
d_graphes = {}
for node in list(graph.nodes()):
......@@ -172,14 +185,14 @@ def compute_all_max_d_graphs(graph, n_best=10):
optimal_score = d_graph.get_optimal_score()
# For a minimal connection To avoid too much shared nodes
if d_graph.score < optimal_score / 2 or d_graph.score >= 1.5 * optimal_score:
if d_graph.score < optimal_score / 4 or d_graph.score >= 1.6 * optimal_score:
continue
node_d_graphes.append(d_graph)
# Cut the the distribution queue
d_graphes[node] = sorted(node_d_graphes)[:n_best]
d_graphes[node] = sorted(node_d_graphes)
# print(node_d_graphes)
return d_graphes
import networkx as nx
""" Generate a d-graph chain (succession of unit d-graphs).
If you slice any 2*d+1 part of the graph, it will be a unit d-graph
@parem size The number of nodes in the chain (should not be less than 2*d+1)
@param d The number of connection on the left and on the right for any node
@return The d-graph chain
"""
def generate_d_graph_chain(size, d):
G = nx.Graph()
for idx in range(size):
# Create the node
G.add_node(idx)
# Link the node to d previous nodes
for prev in range(max(0, idx-d), idx):
G.add_edge(prev, idx)
return G
""" Merge 2 nodes of a graph G.
The new node have edges from both of the previous nodes (dereplicated).
If a link between node1 and node2 exist, it's discarded.
@param G The graph to manipulate
@param node1 First node to merge
@param node2 Second node to merge
@return The modified graph G
"""
def merge_nodes(G, node1, node2):
# Create the new node
new_node = f"{node1}_{node2}" if node1 < node2 else f"{node2}_{node1}"
G.add_node(new_node)
# Add the edges from previous nodes
for edge in G.edges(node1):
neighbor = edge[0] if edge[0] != node1 else edge[1]
if neighbor == node2:
continue
G.add_edge(neighbor, new_node)
for edge in G.edges(node2):
neighbor = edge[0] if edge[0] != node2 else edge[1]
if neighbor == node1:
continue
G.add_edge(neighbor, new_node)
# Remove previous nodes from the graph
G.remove_node(node1)
G.remove_node(node2)
return G
#!/usr/bin/env bash
export PREVPATH=$PYTHONPATH
export PYTHONPATH=deconvolution/
pytest tests
export PYTHONPATH=$PREVPATH
......@@ -3,7 +3,7 @@ import unittest
from d2_graph import D2Graph
from d_graph import Dgraph
from tests.d_graph_data import unit_d_graph, unit_overlapp_d_graph, complete_graph
from tests.d_graph_data import complete_graph
class TestD2Graph(unittest.TestCase):
......@@ -11,7 +11,7 @@ class TestD2Graph(unittest.TestCase):
d2 = D2Graph(complete_graph, 6)
# Evaluate the number of candidate unit d_graphs generated
for node, candidates in d2.d_graphs.items():
for node, candidates in d2.d_graphs_per_node.items():
if node == "C" or node == "B2":
self.assertEquals(1, len(candidates))
else:
......
......@@ -43,6 +43,10 @@ class TestDGraph(unittest.TestCase):
self.assertEquals([['A0'], ['A1'], ['A2'], ['C'], ['B2'], ['B1'], ['B0']], lst)
# def test_list_dgraphs(self):
if __name__ == "__main__":
unittest.main()
import unittest
from d_graph import Dgraph
class TestGraphManipulation(unittest.TestCase):
def test_dg_to_list(self):
center, h1, h2, G = unit_d_graph
dg = Dgraph(center)
dg.put_halves(h1, h2, G)
lst = dg.to_ordered_lists()
self.assertEquals([['A0'], ['A1'], ['A2'], ['C'], ['B2'], ['B1'], ['B0']], lst)
if __name__ == "__main__":
unittest.main()
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment