...
 
Commits (2)
import networkx as nx
# from deconvolution.d2graph.d2_path import Unitig
""" Remove unnecessary transitions
"""
def transitive_reduction(d2g):
# Remove self edges
for edge in d2g.edges():
if edge[0] == edge[1]:
d2g.remove_edge(*edge)
# Analyse the graph for removable edges
to_remove = set()
for edge in d2g.edges():
dg1_name, dg2_name = edge
# Extract dgs
dg1 = d2g.node_by_idx[int(dg1_name)]
dg2 = d2g.node_by_idx[int(dg2_name)]
# Extract common neighbors
nei1 = frozenset(d2g.neighbors(dg1_name))
nei2 = frozenset(d2g.neighbors(dg2_name))
common = nei1.intersection(nei2)
# Look for all the common neighbors, if edge must be remove or not
current_dist = d2g[dg1_name][dg2_name]["distance"]
for node in common:
com_dg = d2g.node_by_idx[int(node)]
extern_dist = d2g[dg1_name][node]["distance"] + d2g[node][dg2_name]["distance"]
# If better path, remove the edge
if extern_dist <= current_dist:
to_remove.add((dg1_name, dg2_name))
# Remove the edges
for edge in to_remove:
dg1_name, dg2_name = edge
# Remove from graph
d2g.remove_edge(dg1_name, dg2_name)
print(f"{len(to_remove)} edge removed")
print(f"{len(d2g.edges())} remaining")
""" For each node of the d2 graph, construct a node in the reduced graph.
Then, for each node, compute the closest neighbors in d2 (with equal scores) and add an edge
in the greedy graph.
@param d2 Input d2 graph (with distances already computed)
@return A greedy constructed graph.
"""
def greedy_reduct(d2):
gG = nx.Graph()
for node in d2.nodes:
gG.add_node(node)
for dgraph, node in d2.nodes.items():
if not dgraph.idx in d2.distances or len(d2.distances[dgraph.idx]) == 0:
continue
distances = d2.distances[dgraph.idx]
min_dist = min(distances.values())
for graph_idx, dist in distances.items():
if dist == min_dist:
gG.add_edge(node, d2.nodes[d2.node_by_idx[graph_idx]])
return gG
def filter_singeltons(graph):
""" Remove the isolated nodes from graph.
"""
nodelist = list(graph.nodes())
for node in nodelist:
if len(graph[node]) == 0:
graph.remove_node(node)
return graph
# """ Compute the unambiguous paths in a d2g. The d2g must not contain singletons.
# The unitigs are sorted by increasing penalty first and decreasing size second.
# @param d2g a d2g graph
# @return a list of unitigs
# """
# def compute_unitigs(d2g):
# unitigs = []
# used_nodes = {int(node): False for node in d2g.nodes()}
#
# for node_id in d2g.nodes():
# node_idx = int(node_id)
# node = d2g.node_by_idx[node_idx]
#
# # If already tested
# if used_nodes[node.idx]:
# continue
# used_nodes[node.idx] = True
#
# if d2g.degree(str(node.idx)) > 2:
# # Not a unitig
# continue
#
# # Create new unitigs
# unitig = compute_unitig_from(d2g, node)
# unitigs.append(unitig)
# for node in unitig:
# used_nodes[node.idx] = True
#
# # Sort the unitigs
# unitigs.sort(key=lambda x: (x.normalized_penalty(), -len(x)))
#
# return unitigs
#
#
# """ Compute a unitig starting from the node regarding the graph.
# The unitig will be extended on both sides
# @param d2g The d2g to look for unitig
# @param node The start node of the unitig
# @return The constructed unitig
# """
# def compute_unitig_from(d2g, node):
# unitig = Unitig(d2g)
# unitig.add_path([node])
# if d2g.degree(str(node.idx)) == 2:
# left, right = d2g.neighbors(str(node.idx))
# else:
# left = next(d2g.neighbors(str(node.idx)))
# right = None
#
# # Extends first side
# prev_node = node
# current_node = d2g.node_by_idx[int(left)]
# unitig = extend_unitig(unitig, d2g, prev_node, current_node)
# unitig.revert()
#
# # Extends second side
# prev_node = node
# current_node = d2g.node_by_idx[int(right)] if right is not None else None
# unitig = extend_unitig(unitig, d2g, prev_node, current_node)
#
# return unitig
#
#
# """ Directional extension of the unitig. Uses the previous and current nodes to detect the extension direction.
# @param unitig The unitig to extend. Will be modified during the execution.
# @param d2g The d2g to follow for extension
# @param prev_node Node already added in the unitig
# @param current_node Node to add into the unitig and used to select the next node to add. If not set, stop the extension.
# @return Return the modified unitig.
# """
# def extend_unitig(unitig, d2g, prev_node, current_node):
# if current_node is None:
# return unitig
#
# # Add the node
# unitig.add_right(current_node)
#
# # Look for the next node
# next_candidates = list(d2g.neighbors(str(current_node.idx)))
# next_candidates.remove(str(prev_node.idx))
# # Select next node
# next_node = d2g.node_by_idx[int(next_candidates[0])] if len(next_candidates) == 1 else None
# # Continue extension
# return extend_unitig(unitig, d2g, current_node, next_node)
...@@ -10,12 +10,12 @@ from deconvolution.dgraph.CliqueDGFactory import CliqueDGFactory ...@@ -10,12 +10,12 @@ from deconvolution.dgraph.CliqueDGFactory import CliqueDGFactory
from deconvolution.dgraph.LouvainDGFactory import LouvainDGFactory from deconvolution.dgraph.LouvainDGFactory import LouvainDGFactory
class D2Graph(nx.Graph): class LcpGraph(nx.Graph):
"""D2Graph (read it (d-graph)²)""" """LcpGraph"""
def __init__(self, debug=False, debug_path='.'): def __init__(self, debug=False, debug_path='.'):
super(D2Graph, self).__init__() super(LcpGraph, self).__init__()
self.all_d_graphs = [] self.all_lcp = []
self.d_graphs_per_node = {} self.lcp_per_node = {}
self.node_by_idx = {} self.node_by_idx = {}
self.barcode_graph = None self.barcode_graph = None
self.index = None self.index = None
...@@ -35,7 +35,7 @@ class D2Graph(nx.Graph): ...@@ -35,7 +35,7 @@ class D2Graph(nx.Graph):
def subgraph(self, nodes): def subgraph(self, nodes):
nodes = frozenset(nodes) nodes = frozenset(nodes)
G = D2Graph(self.barcode_graph) G = LcpGraph(self.barcode_graph)
G.barcode_edge_idxs = self.barcode_edge_idxs G.barcode_edge_idxs = self.barcode_edge_idxs
# Add sub-nodes # Add sub-nodes
...@@ -75,20 +75,20 @@ class D2Graph(nx.Graph): ...@@ -75,20 +75,20 @@ class D2Graph(nx.Graph):
# Compute all the d-graphs # Compute all the d-graphs
if verbose: if verbose:
print("Computing the unit d-graphs..") print("Computing the unit d-graphs..")
dg_factory = None lcp_factory = None
if clique_mode == "louvain": if clique_mode == "louvain":
dg_factory = LouvainDGFactory(self.barcode_graph) lcp_factory = LouvainDGFactory(self.barcode_graph)
else: else:
dg_factory = CliqueDGFactory(self.barcode_graph, min_size_clique=min_size_clique, debug=self.debug, debug_path=self.debug_path) lcp_factory = CliqueDGFactory(self.barcode_graph, min_size_clique=min_size_clique, debug=self.debug, debug_path=self.debug_path)
self.d_graphs_per_node = dg_factory.generate_all_dgraphs(threads=threads, verbose=verbose) self.lcp_per_node = lcp_factory.generate_all_dgraphs(threads=threads, verbose=verbose)
if verbose: if verbose:
counts = sum(len(x) for x in self.d_graphs_per_node.values()) counts = sum(len(x) for x in self.lcp_per_node.values())
print(f"\t {counts} computed d-graphs") print(f"\t {counts} computed d-graphs")
for d_graphs in self.d_graphs_per_node.values(): for d_graphs in self.lcp_per_node.values():
self.all_d_graphs.extend(d_graphs) self.all_lcp.extend(d_graphs)
# Number the d_graphs # Number the d_graphs
for idx, d_graph in enumerate(self.all_d_graphs): for idx, d_graph in enumerate(self.all_lcp):
d_graph.idx = idx d_graph.idx = idx
self.node_by_idx[idx] = d_graph self.node_by_idx[idx] = d_graph
...@@ -133,7 +133,7 @@ class D2Graph(nx.Graph): ...@@ -133,7 +133,7 @@ class D2Graph(nx.Graph):
dg = Dgraph.load(data["udg"], data["score"], data["barcode_edges"]) dg = Dgraph.load(data["udg"], data["score"], data["barcode_edges"])
self.variables.update(dg.edges) self.variables.update(dg.edges)
self.bidict_nodes[node] = dg self.bidict_nodes[node] = dg
self.all_d_graphs.append(dg) self.all_lcp.append(dg)
if dg.idx == -1: if dg.idx == -1:
dg.idx = int(node) dg.idx = int(node)
self.node_by_idx[dg.idx] = dg self.node_by_idx[dg.idx] = dg
...@@ -141,59 +141,33 @@ class D2Graph(nx.Graph): ...@@ -141,59 +141,33 @@ class D2Graph(nx.Graph):
self.bidict_nodes = bidict(self.bidict_nodes) self.bidict_nodes = bidict(self.bidict_nodes)
def create_index_from_tuples(self, tuple_size=3, verbose=True):
index = {}
if verbose:
print("\tIndex d-graphs")
for lst_idx, dg in enumerate(self.all_d_graphs):
if verbose:
sys.stdout.write(f"\r\t{lst_idx+1}/{len(self.all_d_graphs)}")
sys.stdout.flush()
nodelist = dg.to_sorted_list()
if len(nodelist) < tuple_size:
continue
# Generate all tuplesize-mers
for dmer in itertools.combinations(nodelist, tuple_size):
if dmer not in index:
index[dmer] = set()
index[dmer].add(dg)
if verbose:
print()
return index
def create_graph_from_node_neighborhoods(self, neighborhood_threshold=0.25): def create_graph_from_node_neighborhoods(self, neighborhood_threshold=0.25):
nodes = {} nodes = {}
# Create the nodes of d2g from udgs # Create the nodes of lcpg from udgs
for dg in self.all_d_graphs: for lcp in self.all_lcp:
nodes[dg] = dg.idx nodes[lcp] = lcp.idx
self.add_node(nodes[dg]) self.add_node(nodes[lcp])
# Add covering barcode edges # Add covering barcode edges
barcode_edges = " ".join([str(x) for x in dg.edges]) barcode_edges = " ".join([str(x) for x in lcp.edges])
self.nodes[nodes[dg]]["barcode_edges"] = barcode_edges self.nodes[nodes[lcp]]["barcode_edges"] = barcode_edges
self.nodes[nodes[dg]]["score"] = f"{dg.score}/{dg.get_optimal_score()}" self.nodes[nodes[lcp]]["score"] = f"{lcp.score}/{lcp.get_optimal_score()}"
self.nodes[nodes[dg]]["udg"] = str(dg) self.nodes[nodes[lcp]]["udg"] = str(lcp)
self.nodes[nodes[dg]]["central_node_barcode"] = str(dg.center) self.nodes[nodes[lcp]]["central_node_barcode"] = str(lcp.center)
# Create the edges from neighbor edges # Create the edges from neighbor edges
for dg in self.all_d_graphs: for lcp in self.all_lcp:
for node in dg.to_node_set(): for node in lcp.to_node_set():
if node == dg.center: if node == lcp.center:
continue continue
entry = frozenset({node}) entry = frozenset({node})
if entry in self.d_graphs_per_node: if entry in self.lcp_per_node:
colliding_dgs = self.d_graphs_per_node[entry] colliding_dgs = self.lcp_per_node[entry]
for colliding_dg in colliding_dgs: for colliding_lcp in colliding_dgs:
distance = dg.distance_to(colliding_dg) distance = lcp.distance_to(colliding_lcp)
distance_ratio = distance / (len(dg.nodes) + len(colliding_dg.nodes)) distance_ratio = distance / (len(lcp.nodes) + len(colliding_lcp.nodes))
if distance_ratio <= neighborhood_threshold: if distance_ratio <= neighborhood_threshold:
self.add_edge(nodes[dg], nodes[colliding_dg], distance=distance) self.add_edge(nodes[lcp], nodes[colliding_lcp], distance=distance)
# Filter out singletons # Filter out singletons
graph_nodes = list(nodes) graph_nodes = list(nodes)
...@@ -203,45 +177,3 @@ class D2Graph(nx.Graph): ...@@ -203,45 +177,3 @@ class D2Graph(nx.Graph):
del nodes[n] del nodes[n]
return bidict(nodes) return bidict(nodes)
def create_graph_from_index(self):
nodes = {}
for dmer in self.index:
dgs = list(set(self.index[dmer]))
for d_idx, dg in enumerate(dgs):
# Create a node name
if dg not in nodes:
nodes[dg] = dg.idx
# Add the node
self.add_node(nodes[dg])
# Add covering barcode edges
barcode_edges = " ".join([str(self.barcode_edge_idxs[x]) for x in dg.edges])
self.nodes[nodes[dg]]["barcode_edges"] = barcode_edges
self.nodes[nodes[dg]]["score"] = f"{dg.score}/{dg.get_optimal_score()}"
self.nodes[nodes[dg]]["udg"] = str(dg)
# Add the edges
for prev_idx in range(d_idx):
prev_dg = dgs[prev_idx]
# Add on small distances
d = dg.distance_to(prev_dg)
if d <= min(len(dg.node_set)/2, len(prev_dg.node_set)/2):
self.add_edge(nodes[dg], nodes[prev_dg], distance=d)
return bidict(nodes)
def compute_distances(self):
for x, y, data in self.edges(data=True):
dg1 = self.node_by_idx[x]
dg2 = self.node_by_idx[y]
if dg1 == dg2:
continue
# Distance computing and adding in the dist dicts
d = dg1.distance_to(dg2)
data["distance"] = d
...@@ -3,14 +3,14 @@ from collections import Counter ...@@ -3,14 +3,14 @@ from collections import Counter
import sys import sys
""" Represent an udg path into a d2g graph """ Represent an udg path into a lcpg graph
""" """
class Path(list): class Path(list):
def __init__(self, d2g): def __init__(self, lcpg):
super(Path, self).__init__() super(Path, self).__init__()
self.d2g = d2g self.lcpg = lcpg
self.covering_variables = {x: 0 for x in self.d2g.variables} self.covering_variables = {x: 0 for x in self.lcpg.variables}
self.covering_value = 0 self.covering_value = 0
# a succession of Counter (multiset) # a succession of Counter (multiset)
self.barcode_order = [] self.barcode_order = []
...@@ -19,7 +19,7 @@ class Path(list): ...@@ -19,7 +19,7 @@ class Path(list):
self.size_score = 0 self.size_score = 0
def append(self, obj) -> None: def append(self, obj) -> None:
lcp = self.d2g.get_lcp(obj) lcp = self.lcpg.get_lcp(obj)
# Update the covering variables # Update the covering variables
for edge_idx in lcp.edges: for edge_idx in lcp.edges:
...@@ -100,7 +100,7 @@ class Path(list): ...@@ -100,7 +100,7 @@ class Path(list):
self.barcode_order.pop(set_idx) self.barcode_order.pop(set_idx)
def copy(self): def copy(self):
copy = Path(self.d2g) copy = Path(self.lcpg)
# Copy the list # Copy the list
for x in self: for x in self:
......
import networkx as nx
# from deconvolution.lcpgraph.d2_path import Unitig
def transitive_reduction(lcpg):
""" Remove unnecessary transitions
"""
# Remove self edges
for edge in lcpg.edges():
if edge[0] == edge[1]:
lcpg.remove_edge(*edge)
# Analyse the graph for removable edges
to_remove = set()
for edge in lcpg.edges():
lcp1_name, lcp2_name = edge
# Extract common neighbors
nei1 = frozenset(lcpg.neighbors(lcp1_name))
nei2 = frozenset(lcpg.neighbors(lcp2_name))
common = nei1.intersection(nei2)
# Look for all the common neighbors, if edge must be remove or not
current_dist = lcpg[lcp1_name][lcp2_name]["distance"]
for node in common:
com_dg = lcpg.node_by_idx[int(node)]
extern_dist = lcpg[lcp1_name][node]["distance"] + lcpg[node][lcp2_name]["distance"]
# If better path, remove the edge
if extern_dist <= current_dist:
to_remove.add((lcp1_name, lcp2_name))
# Remove the edges
for edge in to_remove:
lcp1_name, lcp2_name = edge
# Remove from graph
lcpg.remove_edge(lcp1_name, lcp2_name)
print(f"{len(to_remove)} edge removed")
print(f"{len(lcpg.edges())} remaining")
from deconvolution.d2graph.d2_path import Path from deconvolution.lcpgraph.lcp_path import Path
""" Greedy algorithm. Start with th most probable unitig (ie lowest normalized penalty first and largest unitig for """ Greedy algorithm. Start with th most probable unitig (ie lowest normalized penalty first and largest unitig for
equalities). Then extends on both side to the nearest interesting unitig. equalities). Then extends on both side to the nearest interesting unitig.
...@@ -34,7 +34,7 @@ def construct_path_from_unitigs(unitigs, d2g): ...@@ -34,7 +34,7 @@ def construct_path_from_unitigs(unitigs, d2g):
""" Look for the next unitig to join from right endpoint of the path """ Look for the next unitig to join from right endpoint of the path
@param path Currently created path @param path Currently created path
@param unitigs List of unitigs of interest (be careful to fill this list with unitigs that add new covered variables @param unitigs List of unitigs of interest (be careful to fill this list with unitigs that add new covered variables
@param d2g The d2g to use as path support @param lcpg The lcpg to use as path support
@return (path_to, utg) Respectively the path to the next unitig of interest (from the right of the current path) and @return (path_to, utg) Respectively the path to the next unitig of interest (from the right of the current path) and
the corresponding unitig (linked to the path from left to right) the corresponding unitig (linked to the path from left to right)
""" """
...@@ -75,8 +75,8 @@ def _search_way_to_next_unitig(path, unitigs, d2g): ...@@ -75,8 +75,8 @@ def _search_way_to_next_unitig(path, unitigs, d2g):
""" Search the min-penalty paths from a start udg to any of the target nodes. """ Search the min-penalty paths from a start udg to any of the target nodes.
@param start_udg The start node of the path @param start_udg The start node of the path
@param targets A list of udg of interest @param targets A list of udg of interest
@param d2g The d2g to follow @param lcpg The lcpg to follow
@param forbidden_udgs excluded nodes from the d2g @param forbidden_udgs excluded nodes from the lcpg
@return A list of equivalent paths @return A list of equivalent paths
""" """
def _search_endpoint(start_udg, targets, d2g, forbidden_udgs): def _search_endpoint(start_udg, targets, d2g, forbidden_udgs):
......
import random import random
from collections import Counter from collections import Counter
from deconvolution.d2graph.d2_path import Path from deconvolution.lcpgraph.lcp_path import Path
class Optimizer: class Optimizer:
......
...@@ -4,8 +4,8 @@ import networkx as nx ...@@ -4,8 +4,8 @@ import networkx as nx
import argparse import argparse
import sys import sys
from deconvolution.d2graph import d2_graph as d2 from deconvolution.lcpgraph import lcp_graph as d2
from deconvolution.d2graph import d2_algorithms as d2a from deconvolution.lcpgraph import lcpg_algorithms as d2a
def parse_arguments(): def parse_arguments():
...@@ -33,7 +33,7 @@ def main(): ...@@ -33,7 +33,7 @@ def main():
# Loading # Loading
print("--- lcp graph loading ---") print("--- lcp graph loading ---")
lcpg = d2.D2Graph() lcpg = d2.LcpGraph()
lcpg.load(lcpg_name) lcpg.load(lcpg_name)
# Algorithms for reduction # Algorithms for reduction
......
...@@ -4,7 +4,7 @@ import networkx as nx ...@@ -4,7 +4,7 @@ import networkx as nx
import argparse import argparse
import sys import sys
from deconvolution.d2graph import d2_graph as d2, path_optimization as po from deconvolution.lcpgraph import lcp_graph as d2, path_optimization as po
def parse_arguments(): def parse_arguments():
...@@ -31,7 +31,7 @@ def main(): ...@@ -31,7 +31,7 @@ def main():
exit(1) exit(1)
# Loading # Loading
lcpg = d2.D2Graph() lcpg = d2.LcpGraph()
lcpg.load(lcpg_name) lcpg.load(lcpg_name)
# Take the principal component # Take the principal component
......
...@@ -4,8 +4,8 @@ import networkx as nx ...@@ -4,8 +4,8 @@ import networkx as nx
import argparse import argparse
import sys import sys
from deconvolution.d2graph import d2_graph as d2, path_optimization as po from deconvolution.lcpgraph import lcp_graph as d2, path_optimization as po
from deconvolution.d2graph.d2_path import Path from deconvolution.lcpgraph.lcp_path import Path
def parse_arguments(): def parse_arguments():
...@@ -34,7 +34,7 @@ def main(): ...@@ -34,7 +34,7 @@ def main():
# Loading # Loading
G = nx.read_gexf(barcode_file) G = nx.read_gexf(barcode_file)
d2g = d2.D2Graph(G) d2g = d2.LcpGraph(G)
d2g.load(d2_file) d2g.load(d2_file)
# Take the principal component # Take the principal component
......
...@@ -4,7 +4,7 @@ import networkx as nx ...@@ -4,7 +4,7 @@ import networkx as nx
import argparse import argparse
import sys import sys
from deconvolution.d2graph import d2_graph as d2 from deconvolution.lcpgraph import lcp_graph as d2
def parse_arguments(): def parse_arguments():
...@@ -49,7 +49,7 @@ def main(): ...@@ -49,7 +49,7 @@ def main():
shutil.rmtree(debug_path) shutil.rmtree(debug_path)
os.mkdir(debug_path) os.mkdir(debug_path)
d2g = d2.D2Graph(debug=debug, debug_path=debug_path) d2g = d2.LcpGraph(debug=debug, debug_path=debug_path)
d2g.construct_from_barcodes( d2g.construct_from_barcodes(
barcode_graph, barcode_graph,
neighbor_threshold=args.edge_divergence_threshold, neighbor_threshold=args.edge_divergence_threshold,
......
...@@ -3,8 +3,8 @@ from distutils.core import setup ...@@ -3,8 +3,8 @@ from distutils.core import setup
setup( setup(
name='10X-deconvolve', name='10X-deconvolve',
version='0.1dev', version='0.2dev',
packages=['deconvolution.d2graph', 'deconvolution.dgraph', 'deconvolution.main', 'experiments', 'tests'], packages=['deconvolution.lcpgraph', 'deconvolution.dgraph', 'deconvolution.main', 'experiments', 'tests'],
license='AGPL V3', license='AGPL V3',
long_description=open('README.md').read(), long_description=open('README.md').read(),
) )
...@@ -3,7 +3,7 @@ import tempfile ...@@ -3,7 +3,7 @@ import tempfile
import networkx as nx import networkx as nx
from scipy.special import comb from scipy.special import comb
from deconvolution.d2graph.d2_graph import D2Graph from deconvolution.lcpgraph.lcp_graph import LcpGraph
from deconvolution.dgraph import graph_manipulator as gm from deconvolution.dgraph import graph_manipulator as gm
...@@ -14,18 +14,18 @@ class TestD2Graph(unittest.TestCase): ...@@ -14,18 +14,18 @@ class TestD2Graph(unittest.TestCase):
# size = 2 * d + 3 # size = 2 * d + 3
# #
# G = gm.generate_d_graph_chain(size, d) # G = gm.generate_d_graph_chain(size, d)
# d2 = D2Graph(G) # d2 = LcpGraph(G)
# print("before", d) # print("before", d)
# d2.construct_from_barcodes(neighbor_threshold=0, min_size_clique=d, verbose=False) # d2.construct_from_barcodes(neighbor_threshold=0, min_size_clique=d, verbose=False)
# print("after", d) # print("after", d)
# #
# # for dg in d2.all_d_graphs: # # for dg in d2.all_lcp:
# # print(dg.score, dg.get_link_divergence(), dg) # # print(dg.score, dg.get_link_divergence(), dg)
# # print() # # print()
# #
# # Test the number of d-graphs # # Test the number of d-graphs
# awaited_d_num = size - 2 * d # awaited_d_num = size - 2 * d
# self.assertEqual(awaited_d_num, len(d2.all_d_graphs)) # self.assertEqual(awaited_d_num, len(d2.all_lcp))
# #
# # Test connectivity # # Test connectivity
# # Center node names # # Center node names
...@@ -48,14 +48,14 @@ class TestD2Graph(unittest.TestCase): ...@@ -48,14 +48,14 @@ class TestD2Graph(unittest.TestCase):
def test_no_variability(self): def test_no_variability(self):
barcode_graph = nx.read_gexf("test_data/bar_1000_5_2.gexf") barcode_graph = nx.read_gexf("test_data/bar_1000_5_2.gexf")
d2 = D2Graph(barcode_graph) d2 = LcpGraph(barcode_graph)
d2.construct_from_barcodes() d2.construct_from_barcodes()
udgs = d2.all_d_graphs udgs = d2.all_lcp
for _ in range(5): for _ in range(5):
d2 = D2Graph(barcode_graph) d2 = LcpGraph(barcode_graph)
d2.construct_from_barcodes() d2.construct_from_barcodes()
self.assertEqual(len(udgs), len(d2.all_d_graphs)) self.assertEqual(len(udgs), len(d2.all_lcp))
def test_reloading(self): def test_reloading(self):
...@@ -65,7 +65,7 @@ class TestD2Graph(unittest.TestCase): ...@@ -65,7 +65,7 @@ class TestD2Graph(unittest.TestCase):
# Create a d2 graph # Create a d2 graph
G = gm.generate_d_graph_chain(size, d) G = gm.generate_d_graph_chain(size, d)
d2 = D2Graph(G) d2 = LcpGraph(G)
d2.construct_from_barcodes(verbose=False) d2.construct_from_barcodes(verbose=False)
# Save and reload the d2 in a temporary file # Save and reload the d2 in a temporary file
...@@ -74,7 +74,7 @@ class TestD2Graph(unittest.TestCase): ...@@ -74,7 +74,7 @@ class TestD2Graph(unittest.TestCase):
nx.write_gexf(d2, fp.name) nx.write_gexf(d2, fp.name)
# Reload # Reload
d2_reloaded = D2Graph(G) d2_reloaded = LcpGraph(G)
d2_reloaded.load(fp.name) d2_reloaded.load(fp.name)
# Test the nx graph # Test the nx graph
...@@ -84,11 +84,11 @@ class TestD2Graph(unittest.TestCase): ...@@ -84,11 +84,11 @@ class TestD2Graph(unittest.TestCase):
# TODO: Verify distances # TODO: Verify distances
# Test all_d_graphs # Test all_lcp
self.assertEqual(len(d2_reloaded.all_d_graphs), len(d2.all_d_graphs)) self.assertEqual(len(d2_reloaded.all_lcp), len(d2.all_lcp))
# Verify dg idxs # Verify dg idxs
reloaded_idxs = [dg.idx for dg in d2_reloaded.all_d_graphs] reloaded_idxs = [dg.idx for dg in d2_reloaded.all_lcp]
for dg in d2.all_d_graphs: for dg in d2.all_lcp:
self.assertTrue(dg.idx in reloaded_idxs) self.assertTrue(dg.idx in reloaded_idxs)
......