Commit 76239d13 authored by Yoann Dufresne's avatar Yoann Dufresne
Browse files

Test and support previously writed code

parent 31633d6a
......@@ -3,7 +3,6 @@
import sys
print(sys.argv)
with open(sys.argv[1]) as file:
header = file.readline()
nb_nodes, nb_variables = [int(x) for x in header.split()]
......
......@@ -2,25 +2,28 @@ import networkx as nx
from d2_path import Path, Unitig
""" For each node of the d2 graph, construct a node in the reducted graph.
Then, for each node, compute the closest neighbors in d2 (with equal scores) and add an edge
in the greedy graph.
@param d2 Input d2 graph (with distances already computed)
@return A greedy constructed graph.
"""
def greedy_reduct(d2):
""" Compute a graph where, for each node, the neighbors are the closest neighbors.
"""
gG = nx.Graph()
for node in d2.nodes:
gG.add_node(node)
for dgraph, node in d2.nodes.items():
if not dgraph in d2.distances:
if not dgraph.idx in d2.distances or len(d2.distances[dgraph.idx]) == 0:
continue
distances = d2.distances[dgraph]
distances = d2.distances[dgraph.idx]
min_dist = min(distances.values())
for n_dgraph, dist in distances.items():
for graph_idx, dist in distances.items():
if dist == min_dist:
gG.add_edge(node, d2.nodes[n_dgraph])
gG.add_edge(node, d2.nodes[d2.node_by_idx[graph_idx]])
return gG
......
......@@ -2,12 +2,12 @@ import networkx as nx
import itertools
from bidict import bidict
from d_graph import compute_all_max_d_graphs, filter_dominated
from d_graph import compute_all_max_d_graphs, filter_dominated, list_domination_filter
class D2Graph(object):
"""D2Graph (read it (d-graph)²)"""
def __init__(self, graph, index_size=8, verbose=True, debug=False):
def __init__(self, graph, index_size=3, verbose=True, debug=False):
super(D2Graph, self).__init__()
self.graph = graph
......@@ -22,8 +22,10 @@ class D2Graph(object):
# Name the d-graphs
# Number the d_graphs
self.node_by_idx = {}
for idx, d_graph in enumerate(self.all_d_graphs):
d_graph.idx = idx
self.node_by_idx[idx] = d_graph
# Number the edges from original graph
self.edge_idxs = {}
......@@ -40,6 +42,7 @@ class D2Graph(object):
if verbose:
print("Compute the dmer index")
self.index = self.create_index_from_tuples(index_size)
self.filter_dominated_in_index()
# Compute node distances for pair of dgraphs that share at least 1 dmer.
if verbose:
print("Compute a subset of distances")
......@@ -156,4 +159,39 @@ class D2Graph(object):
return G, bidict(nodes)
def filter_dominated_in_index(self):
to_remove = []
# Find dominated
for dmer, dg_list in self.index.items():
undominated = list_domination_filter(dg_list)
# Register dominated
if len(dg_list) != len(undominated):
for dg in dg_list:
if not dg in undominated:
to_remove.append(dg)
self.index[dmer] = undominated
to_remove = frozenset(to_remove)
# Remove dominated in global list
for r_dg in to_remove:
self.all_d_graphs.remove(r_dg)
self.d_graphs_per_node[r_dg.center].remove(r_dg)
# Remove dominated in index
removable_dmers = []
for dmer, indexed_list in self.index.items():
for r_dg in to_remove:
if r_dg in indexed_list:
indexed_list.remove(r_dg)
if len(indexed_list) == 0:
removable_dmers.append(dmer)
# Remove empty dmers
for dmer in removable_dmers:
del self.index[dmer]
\ No newline at end of file
......@@ -13,7 +13,7 @@ class Dgraph(object):
self.score = 0
self.halves = [None,None]
self.connexity = [None,None]
self.nodes = [center]
self.nodes = [self.center]
self.edges = []
......@@ -26,40 +26,30 @@ class Dgraph(object):
self.score = 0
self.halves[0] = h1
self.halves[1] = h2
self.nodes = sorted([self.center] + h1 + h2)
self.nodes = sorted([self.center] + self.halves[0] + self.halves[1])
self.connexity[0] = {key:0 for key in self.halves[0]}
self.connexity[1] = {key:0 for key in self.halves[1]}
self.edges = []
# Compute link arities
for node1 in h1:
for node1 in self.halves[0]:
neighbors = set(graph.neighbors(node1))
for node2 in h2:
for node2 in self.halves[1]:
if node1 == node2 or node2 in neighbors:
self.score += 1
self.connexity[0][node1] += 1
self.connexity[1][node2] += 1
# Compute links from the center to the other nodes
for idx, node1 in enumerate(self.nodes):
for node2 in self.nodes[idx+1:]:
if graph.has_edge(node1, node2):
if node1 < node2:
self.edges.append((node1, node2))
elif node2 < node1:
else:
self.edges.append((node2, node1))
# Compute links from the center to the other nodes
for node in h1:
if node < self.center:
self.edges.append((node, self.center))
else:
self.edges.append((self.center, node))
for node in h2:
if node < self.center:
self.edges.append((node, self.center))
else:
self.edges.append((self.center, node))
# Sort the halves by descending connexity
connex = self.connexity
self.halves[0].sort(reverse=True, key=lambda v: connex[0][v])
......@@ -133,9 +123,9 @@ class Dgraph(object):
# domination second condition
if len(dg1_nodes) == len(dg2_nodes):
if self.get_link_divergence() < dg.get_link_divergence():
if self.get_link_divergence() > dg.get_link_divergence():
return True
elif self.get_link_divergence() <= dg.get_link_divergence():
elif self.get_link_divergence() >= dg.get_link_divergence():
return True
return False
......@@ -155,6 +145,7 @@ class Dgraph(object):
def __hash__(self):
nodelist = list(self.to_list())
nodelist = [str(x) for x in nodelist]
nodelist.sort()
return ",".join(nodelist).__hash__()
......@@ -175,7 +166,7 @@ class Dgraph(object):
def __repr__(self):
# print(self.halves)
representation = self.center + " " + str(self.score) + "/" + str(self.get_optimal_score()) + " "
representation = str(self.center) + " " + str(self.score) + "/" + str(self.get_optimal_score()) + " "
representation += "[" + ", ".join([f"{node} {self.connexity[0][node]}" for node in self.halves[0]]) + "]"
representation += "[" + ", ".join([f"{node} {self.connexity[1][node]}" for node in self.halves[1]]) + "]"
return representation
......@@ -246,6 +237,22 @@ def add_new_dg_regarding_domination(dg, undominated_dgs_list):
return undominated_dgs_list
def filter_dominated(d_graphs, overall=False, in_place=True):
if not overall:
return local_domination_filter(d_graphs, in_place)
all_d_graphs = []
for dgs in d_graphs.values():
all_d_graphs.extend(dgs)
print(len(all_d_graphs))
all_d_graphs = list_domination_filter(all_d_graphs)
print(len(all_d_graphs))
return d_graphs
""" Filter the d-graphs by node. In a list of d-graph centered on a node n, if a d-graph is
completly included in another and have a highest distance score to the optimal, then it is
filtered out.
......@@ -254,18 +261,28 @@ def add_new_dg_regarding_domination(dg, undominated_dgs_list):
copy all the content in a new dictionnary.
@return The filtered dictionnary of d-graph per node.
"""
def filter_dominated(d_graphs, in_place=True):
def local_domination_filter(d_graphs, in_place=True):
filtered = d_graphs if in_place else {}
# Filter node by node
for node, d_graph_list in d_graphs.items():
filtered_by_node = []
# Add the non filtered d-graph to the output
filtered[node] = list_domination_filter(d_graph_list)
# Filter d-graph by d-graph
for dg in d_graph_list:
add_new_dg_regarding_domination(dg, filtered_by_node)
return filtered
# Add the non filtered d-graph to the output
filtered[node] = filtered_by_node
""" Filter the input d-graphs list. In the list of d-graph centered on a node n, if a d-graph is
completly included in another and have a highest distance score to the optimal, then it is
filtered out.
@param d_graphs All the d-graphs to filter.
@return The filtered dictionnary of d-graph per node.
"""
def list_domination_filter(d_graphs):
filtered = []
# Filter d-graph by d-graph
for dg in d_graphs:
add_new_dg_regarding_domination(dg, filtered)
return filtered
......@@ -20,14 +20,14 @@ def main():
elif filename.endswith('.gexf'):
G = nx.read_gexf(filename)
d2g = d2.D2Graph(G)
d2g = d2.D2Graph(G, index_size=8)
d2g.save("data/optimization.tsv")
G, names = d2g.to_nx_graph()
nx.write_gexf(G, "data/d2_graph.gexf")
# print("Greedy reduction of the graph")
# greedy = filter_singeltons(greedy_reduct(d2g))
# nx.write_gexf(greedy, "data/d2_graph_greedy.gexf")
print("Greedy reduction of the graph")
greedy = greedy_reduct(d2g)
nx.write_gexf(greedy, "data/d2_graph_greedy.gexf")
# print("Compute unitigs from greedy reducted graph")
# unitigs = compute_unitigs(greedy, d2g)
......
......@@ -2,5 +2,5 @@
export PREVPATH=$PYTHONPATH
export PYTHONPATH=deconvolution/
pytest tests
pytest -s tests
export PYTHONPATH=$PREVPATH
import unittest
import graph_manipulator as gm
from d2_graph import D2Graph
class TestD2Algorithms(unittest.TestCase):
def test_greedy_reduction(self):
pass
if __name__ == "__main__":
unittest.main()
import unittest
from scipy.special import comb
from d2_graph import D2Graph
from d_graph import Dgraph
import graph_manipulator as gm
from tests.d_graph_data import complete_graph
......@@ -29,13 +31,33 @@ class TestD2Graph(unittest.TestCase):
self.assertEquals(1, len(d2.index[dmer]))
def test_to_nx_graph(self):
d2 = D2Graph(complete_graph, 6)
d2G, node_names = d2.to_nx_graph()
nodes = list(d2G.nodes())
self.assertEquals(2, len(nodes))
edges = list(d2G.edges())
self.assertEquals(1, len(edges))
def test_linear_d2_construction(self):
for d in range(1, 10):
size = 2 * d + 3
index_k = 2 * d - 1
G = gm.generate_d_graph_chain(size, d)
d2 = D2Graph(G, index_size=index_k)
# Test the number of d-graphs
awaited_d_num = size - 2 * d
self.assertEquals(awaited_d_num, len(d2.all_d_graphs))
# Test index
awaited_index_size = comb(2*d+1, index_k) + (size - (2*d+1)) * comb(2*d, index_k-1)
print(d, size, index_k, awaited_index_size)
if len(d2.index) != awaited_index_size:
dmers = [list(x) for x in d2.index]
dmers = [str(x) for x in dmers if len(x) != len(frozenset(x))]
print("\n".join(dmers))
self.assertEquals(awaited_index_size, len(d2.index))
d2_nx = d2.nx_graph
# print(d2_nx.nodes())
# print(d2_nx.edges())
# Test connectivity
......
......@@ -2,6 +2,7 @@ import unittest
from tests.d_graph_data import unit_d_graph
from d_graph import Dgraph
import graph_manipulator as gm
......@@ -21,6 +22,24 @@ class TestDGraph(unittest.TestCase):
self.assertEquals(dg.connexity[0], {"A0":0,"A1":1,"A2":2})
self.assertEquals(dg.connexity[1], {"B0":0,"B1":1,"B2":2})
def test_linear_perfect_construction(self):
for d in range(1, 5):
size = d * 2 + 1
# nx graph construction
G = gm.generate_d_graph_chain(size, d)
center = d
h1 = list(G.subgraph([x for x in range(d)]).nodes())
h2 = list(G.subgraph([size-1-x for x in range(d)]).nodes())
# d-graph construction
dg = Dgraph(center)
dg.put_halves(h1, h2, G)
# Test the internal arity
awaited_arity = 3*d*d/2 + d/2
self.assertEquals(awaited_arity, len(dg.edges))
def test_optimal_score(self):
center, h1, h2, G = unit_d_graph
dg = Dgraph(center)
......
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment