...

Commits (2)
 import networkx as nx # from deconvolution.d2graph.d2_path import Unitig """ Remove unnecessary transitions """ def transitive_reduction(d2g): # Remove self edges for edge in d2g.edges(): if edge[0] == edge[1]: d2g.remove_edge(*edge) # Analyse the graph for removable edges to_remove = set() for edge in d2g.edges(): dg1_name, dg2_name = edge # Extract dgs dg1 = d2g.node_by_idx[int(dg1_name)] dg2 = d2g.node_by_idx[int(dg2_name)] # Extract common neighbors nei1 = frozenset(d2g.neighbors(dg1_name)) nei2 = frozenset(d2g.neighbors(dg2_name)) common = nei1.intersection(nei2) # Look for all the common neighbors, if edge must be remove or not current_dist = d2g[dg1_name][dg2_name]["distance"] for node in common: com_dg = d2g.node_by_idx[int(node)] extern_dist = d2g[dg1_name][node]["distance"] + d2g[node][dg2_name]["distance"] # If better path, remove the edge if extern_dist <= current_dist: to_remove.add((dg1_name, dg2_name)) # Remove the edges for edge in to_remove: dg1_name, dg2_name = edge # Remove from graph d2g.remove_edge(dg1_name, dg2_name) print(f"{len(to_remove)} edge removed") print(f"{len(d2g.edges())} remaining") """ For each node of the d2 graph, construct a node in the reduced graph. Then, for each node, compute the closest neighbors in d2 (with equal scores) and add an edge in the greedy graph. @param d2 Input d2 graph (with distances already computed) @return A greedy constructed graph. """ def greedy_reduct(d2): gG = nx.Graph() for node in d2.nodes: gG.add_node(node) for dgraph, node in d2.nodes.items(): if not dgraph.idx in d2.distances or len(d2.distances[dgraph.idx]) == 0: continue distances = d2.distances[dgraph.idx] min_dist = min(distances.values()) for graph_idx, dist in distances.items(): if dist == min_dist: gG.add_edge(node, d2.nodes[d2.node_by_idx[graph_idx]]) return gG def filter_singeltons(graph): """ Remove the isolated nodes from graph. """ nodelist = list(graph.nodes()) for node in nodelist: if len(graph[node]) == 0: graph.remove_node(node) return graph # """ Compute the unambiguous paths in a d2g. The d2g must not contain singletons. # The unitigs are sorted by increasing penalty first and decreasing size second. # @param d2g a d2g graph # @return a list of unitigs # """ # def compute_unitigs(d2g): # unitigs = [] # used_nodes = {int(node): False for node in d2g.nodes()} # # for node_id in d2g.nodes(): # node_idx = int(node_id) # node = d2g.node_by_idx[node_idx] # # # If already tested # if used_nodes[node.idx]: # continue # used_nodes[node.idx] = True # # if d2g.degree(str(node.idx)) > 2: # # Not a unitig # continue # # # Create new unitigs # unitig = compute_unitig_from(d2g, node) # unitigs.append(unitig) # for node in unitig: # used_nodes[node.idx] = True # # # Sort the unitigs # unitigs.sort(key=lambda x: (x.normalized_penalty(), -len(x))) # # return unitigs # # # """ Compute a unitig starting from the node regarding the graph. # The unitig will be extended on both sides # @param d2g The d2g to look for unitig # @param node The start node of the unitig # @return The constructed unitig # """ # def compute_unitig_from(d2g, node): # unitig = Unitig(d2g) # unitig.add_path([node]) # if d2g.degree(str(node.idx)) == 2: # left, right = d2g.neighbors(str(node.idx)) # else: # left = next(d2g.neighbors(str(node.idx))) # right = None # # # Extends first side # prev_node = node # current_node = d2g.node_by_idx[int(left)] # unitig = extend_unitig(unitig, d2g, prev_node, current_node) # unitig.revert() # # # Extends second side # prev_node = node # current_node = d2g.node_by_idx[int(right)] if right is not None else None # unitig = extend_unitig(unitig, d2g, prev_node, current_node) # # return unitig # # # """ Directional extension of the unitig. Uses the previous and current nodes to detect the extension direction. # @param unitig The unitig to extend. Will be modified during the execution. # @param d2g The d2g to follow for extension # @param prev_node Node already added in the unitig # @param current_node Node to add into the unitig and used to select the next node to add. If not set, stop the extension. # @return Return the modified unitig. # """ # def extend_unitig(unitig, d2g, prev_node, current_node): # if current_node is None: # return unitig # # # Add the node # unitig.add_right(current_node) # # # Look for the next node # next_candidates = list(d2g.neighbors(str(current_node.idx))) # next_candidates.remove(str(prev_node.idx)) # # Select next node # next_node = d2g.node_by_idx[int(next_candidates[0])] if len(next_candidates) == 1 else None # # Continue extension # return extend_unitig(unitig, d2g, current_node, next_node)
 ... @@ -10,12 +10,12 @@ from deconvolution.dgraph.CliqueDGFactory import CliqueDGFactory ... @@ -10,12 +10,12 @@ from deconvolution.dgraph.CliqueDGFactory import CliqueDGFactory from deconvolution.dgraph.LouvainDGFactory import LouvainDGFactory from deconvolution.dgraph.LouvainDGFactory import LouvainDGFactory class D2Graph(nx.Graph): class LcpGraph(nx.Graph): """D2Graph (read it (d-graph)²)""" """LcpGraph""" def __init__(self, debug=False, debug_path='.'): def __init__(self, debug=False, debug_path='.'): super(D2Graph, self).__init__() super(LcpGraph, self).__init__() self.all_d_graphs = [] self.all_lcp = [] self.d_graphs_per_node = {} self.lcp_per_node = {} self.node_by_idx = {} self.node_by_idx = {} self.barcode_graph = None self.barcode_graph = None self.index = None self.index = None ... @@ -35,7 +35,7 @@ class D2Graph(nx.Graph): ... @@ -35,7 +35,7 @@ class D2Graph(nx.Graph): def subgraph(self, nodes): def subgraph(self, nodes): nodes = frozenset(nodes) nodes = frozenset(nodes) G = D2Graph(self.barcode_graph) G = LcpGraph(self.barcode_graph) G.barcode_edge_idxs = self.barcode_edge_idxs G.barcode_edge_idxs = self.barcode_edge_idxs # Add sub-nodes # Add sub-nodes ... @@ -75,20 +75,20 @@ class D2Graph(nx.Graph): ... @@ -75,20 +75,20 @@ class D2Graph(nx.Graph): # Compute all the d-graphs # Compute all the d-graphs if verbose: if verbose: print("Computing the unit d-graphs..") print("Computing the unit d-graphs..") dg_factory = None lcp_factory = None if clique_mode == "louvain": if clique_mode == "louvain": dg_factory = LouvainDGFactory(self.barcode_graph) lcp_factory = LouvainDGFactory(self.barcode_graph) else: else: dg_factory = CliqueDGFactory(self.barcode_graph, min_size_clique=min_size_clique, debug=self.debug, debug_path=self.debug_path) lcp_factory = CliqueDGFactory(self.barcode_graph, min_size_clique=min_size_clique, debug=self.debug, debug_path=self.debug_path) self.d_graphs_per_node = dg_factory.generate_all_dgraphs(threads=threads, verbose=verbose) self.lcp_per_node = lcp_factory.generate_all_dgraphs(threads=threads, verbose=verbose) if verbose: if verbose: counts = sum(len(x) for x in self.d_graphs_per_node.values()) counts = sum(len(x) for x in self.lcp_per_node.values()) print(f"\t {counts} computed d-graphs") print(f"\t {counts} computed d-graphs") for d_graphs in self.d_graphs_per_node.values(): for d_graphs in self.lcp_per_node.values(): self.all_d_graphs.extend(d_graphs) self.all_lcp.extend(d_graphs) # Number the d_graphs # Number the d_graphs for idx, d_graph in enumerate(self.all_d_graphs): for idx, d_graph in enumerate(self.all_lcp): d_graph.idx = idx d_graph.idx = idx self.node_by_idx[idx] = d_graph self.node_by_idx[idx] = d_graph ... @@ -133,7 +133,7 @@ class D2Graph(nx.Graph): ... @@ -133,7 +133,7 @@ class D2Graph(nx.Graph): dg = Dgraph.load(data["udg"], data["score"], data["barcode_edges"]) dg = Dgraph.load(data["udg"], data["score"], data["barcode_edges"]) self.variables.update(dg.edges) self.variables.update(dg.edges) self.bidict_nodes[node] = dg self.bidict_nodes[node] = dg self.all_d_graphs.append(dg) self.all_lcp.append(dg) if dg.idx == -1: if dg.idx == -1: dg.idx = int(node) dg.idx = int(node) self.node_by_idx[dg.idx] = dg self.node_by_idx[dg.idx] = dg ... @@ -141,59 +141,33 @@ class D2Graph(nx.Graph): ... @@ -141,59 +141,33 @@ class D2Graph(nx.Graph): self.bidict_nodes = bidict(self.bidict_nodes) self.bidict_nodes = bidict(self.bidict_nodes) def create_index_from_tuples(self, tuple_size=3, verbose=True): index = {} if verbose: print("\tIndex d-graphs") for lst_idx, dg in enumerate(self.all_d_graphs): if verbose: sys.stdout.write(f"\r\t{lst_idx+1}/{len(self.all_d_graphs)}") sys.stdout.flush() nodelist = dg.to_sorted_list() if len(nodelist) < tuple_size: continue # Generate all tuplesize-mers for dmer in itertools.combinations(nodelist, tuple_size): if dmer not in index: index[dmer] = set() index[dmer].add(dg) if verbose: print() return index def create_graph_from_node_neighborhoods(self, neighborhood_threshold=0.25): def create_graph_from_node_neighborhoods(self, neighborhood_threshold=0.25): nodes = {} nodes = {} # Create the nodes of d2g from udgs # Create the nodes of lcpg from udgs for dg in self.all_d_graphs: for lcp in self.all_lcp: nodes[dg] = dg.idx nodes[lcp] = lcp.idx self.add_node(nodes[dg]) self.add_node(nodes[lcp]) # Add covering barcode edges # Add covering barcode edges barcode_edges = " ".join([str(x) for x in dg.edges]) barcode_edges = " ".join([str(x) for x in lcp.edges]) self.nodes[nodes[dg]]["barcode_edges"] = barcode_edges self.nodes[nodes[lcp]]["barcode_edges"] = barcode_edges self.nodes[nodes[dg]]["score"] = f"{dg.score}/{dg.get_optimal_score()}" self.nodes[nodes[lcp]]["score"] = f"{lcp.score}/{lcp.get_optimal_score()}" self.nodes[nodes[dg]]["udg"] = str(dg) self.nodes[nodes[lcp]]["udg"] = str(lcp) self.nodes[nodes[dg]]["central_node_barcode"] = str(dg.center) self.nodes[nodes[lcp]]["central_node_barcode"] = str(lcp.center) # Create the edges from neighbor edges # Create the edges from neighbor edges for dg in self.all_d_graphs: for lcp in self.all_lcp: for node in dg.to_node_set(): for node in lcp.to_node_set(): if node == dg.center: if node == lcp.center: continue continue entry = frozenset({node}) entry = frozenset({node}) if entry in self.d_graphs_per_node: if entry in self.lcp_per_node: colliding_dgs = self.d_graphs_per_node[entry] colliding_dgs = self.lcp_per_node[entry] for colliding_dg in colliding_dgs: for colliding_lcp in colliding_dgs: distance = dg.distance_to(colliding_dg) distance = lcp.distance_to(colliding_lcp) distance_ratio = distance / (len(dg.nodes) + len(colliding_dg.nodes)) distance_ratio = distance / (len(lcp.nodes) + len(colliding_lcp.nodes)) if distance_ratio <= neighborhood_threshold: if distance_ratio <= neighborhood_threshold: self.add_edge(nodes[dg], nodes[colliding_dg], distance=distance) self.add_edge(nodes[lcp], nodes[colliding_lcp], distance=distance) # Filter out singletons # Filter out singletons graph_nodes = list(nodes) graph_nodes = list(nodes) ... @@ -203,45 +177,3 @@ class D2Graph(nx.Graph): ... @@ -203,45 +177,3 @@ class D2Graph(nx.Graph): del nodes[n] del nodes[n] return bidict(nodes) return bidict(nodes) def create_graph_from_index(self): nodes = {} for dmer in self.index: dgs = list(set(self.index[dmer])) for d_idx, dg in enumerate(dgs): # Create a node name if dg not in nodes: nodes[dg] = dg.idx # Add the node self.add_node(nodes[dg]) # Add covering barcode edges barcode_edges = " ".join([str(self.barcode_edge_idxs[x]) for x in dg.edges]) self.nodes[nodes[dg]]["barcode_edges"] = barcode_edges self.nodes[nodes[dg]]["score"] = f"{dg.score}/{dg.get_optimal_score()}" self.nodes[nodes[dg]]["udg"] = str(dg) # Add the edges for prev_idx in range(d_idx): prev_dg = dgs[prev_idx] # Add on small distances d = dg.distance_to(prev_dg) if d <= min(len(dg.node_set)/2, len(prev_dg.node_set)/2): self.add_edge(nodes[dg], nodes[prev_dg], distance=d) return bidict(nodes) def compute_distances(self): for x, y, data in self.edges(data=True): dg1 = self.node_by_idx[x] dg2 = self.node_by_idx[y] if dg1 == dg2: continue # Distance computing and adding in the dist dicts d = dg1.distance_to(dg2) data["distance"] = d
 ... @@ -3,14 +3,14 @@ from collections import Counter ... @@ -3,14 +3,14 @@ from collections import Counter import sys import sys """ Represent an udg path into a d2g graph """ Represent an udg path into a lcpg graph """ """ class Path(list): class Path(list): def __init__(self, d2g): def __init__(self, lcpg): super(Path, self).__init__() super(Path, self).__init__() self.d2g = d2g self.lcpg = lcpg self.covering_variables = {x: 0 for x in self.d2g.variables} self.covering_variables = {x: 0 for x in self.lcpg.variables} self.covering_value = 0 self.covering_value = 0 # a succession of Counter (multiset) # a succession of Counter (multiset) self.barcode_order = [] self.barcode_order = [] ... @@ -19,7 +19,7 @@ class Path(list): ... @@ -19,7 +19,7 @@ class Path(list): self.size_score = 0 self.size_score = 0 def append(self, obj) -> None: def append(self, obj) -> None: lcp = self.d2g.get_lcp(obj) lcp = self.lcpg.get_lcp(obj) # Update the covering variables # Update the covering variables for edge_idx in lcp.edges: for edge_idx in lcp.edges: ... @@ -100,7 +100,7 @@ class Path(list): ... @@ -100,7 +100,7 @@ class Path(list): self.barcode_order.pop(set_idx) self.barcode_order.pop(set_idx) def copy(self): def copy(self): copy = Path(self.d2g) copy = Path(self.lcpg) # Copy the list # Copy the list for x in self: for x in self: ... ...
 import networkx as nx # from deconvolution.lcpgraph.d2_path import Unitig def transitive_reduction(lcpg): """ Remove unnecessary transitions """ # Remove self edges for edge in lcpg.edges(): if edge[0] == edge[1]: lcpg.remove_edge(*edge) # Analyse the graph for removable edges to_remove = set() for edge in lcpg.edges(): lcp1_name, lcp2_name = edge # Extract common neighbors nei1 = frozenset(lcpg.neighbors(lcp1_name)) nei2 = frozenset(lcpg.neighbors(lcp2_name)) common = nei1.intersection(nei2) # Look for all the common neighbors, if edge must be remove or not current_dist = lcpg[lcp1_name][lcp2_name]["distance"] for node in common: com_dg = lcpg.node_by_idx[int(node)] extern_dist = lcpg[lcp1_name][node]["distance"] + lcpg[node][lcp2_name]["distance"] # If better path, remove the edge if extern_dist <= current_dist: to_remove.add((lcp1_name, lcp2_name)) # Remove the edges for edge in to_remove: lcp1_name, lcp2_name = edge # Remove from graph lcpg.remove_edge(lcp1_name, lcp2_name) print(f"{len(to_remove)} edge removed") print(f"{len(lcpg.edges())} remaining")
 from deconvolution.d2graph.d2_path import Path from deconvolution.lcpgraph.lcp_path import Path """ Greedy algorithm. Start with th most probable unitig (ie lowest normalized penalty first and largest unitig for """ Greedy algorithm. Start with th most probable unitig (ie lowest normalized penalty first and largest unitig for equalities). Then extends on both side to the nearest interesting unitig. equalities). Then extends on both side to the nearest interesting unitig. ... @@ -34,7 +34,7 @@ def construct_path_from_unitigs(unitigs, d2g): ... @@ -34,7 +34,7 @@ def construct_path_from_unitigs(unitigs, d2g): """ Look for the next unitig to join from right endpoint of the path """ Look for the next unitig to join from right endpoint of the path @param path Currently created path @param path Currently created path @param unitigs List of unitigs of interest (be careful to fill this list with unitigs that add new covered variables @param unitigs List of unitigs of interest (be careful to fill this list with unitigs that add new covered variables @param d2g The d2g to use as path support @param lcpg The lcpg to use as path support @return (path_to, utg) Respectively the path to the next unitig of interest (from the right of the current path) and @return (path_to, utg) Respectively the path to the next unitig of interest (from the right of the current path) and the corresponding unitig (linked to the path from left to right) the corresponding unitig (linked to the path from left to right) """ """ ... @@ -75,8 +75,8 @@ def _search_way_to_next_unitig(path, unitigs, d2g): ... @@ -75,8 +75,8 @@ def _search_way_to_next_unitig(path, unitigs, d2g): """ Search the min-penalty paths from a start udg to any of the target nodes. """ Search the min-penalty paths from a start udg to any of the target nodes. @param start_udg The start node of the path @param start_udg The start node of the path @param targets A list of udg of interest @param targets A list of udg of interest @param d2g The d2g to follow @param lcpg The lcpg to follow @param forbidden_udgs excluded nodes from the d2g @param forbidden_udgs excluded nodes from the lcpg @return A list of equivalent paths @return A list of equivalent paths """ """ def _search_endpoint(start_udg, targets, d2g, forbidden_udgs): def _search_endpoint(start_udg, targets, d2g, forbidden_udgs): ... ...
 import random import random from collections import Counter from collections import Counter from deconvolution.d2graph.d2_path import Path from deconvolution.lcpgraph.lcp_path import Path class Optimizer: class Optimizer: ... ...
 ... @@ -4,8 +4,8 @@ import networkx as nx ... @@ -4,8 +4,8 @@ import networkx as nx import argparse import argparse import sys import sys from deconvolution.d2graph import d2_graph as d2 from deconvolution.lcpgraph import lcp_graph as d2 from deconvolution.d2graph import d2_algorithms as d2a from deconvolution.lcpgraph import lcpg_algorithms as d2a def parse_arguments(): def parse_arguments(): ... @@ -33,7 +33,7 @@ def main(): ... @@ -33,7 +33,7 @@ def main(): # Loading # Loading print("--- lcp graph loading ---") print("--- lcp graph loading ---") lcpg = d2.D2Graph() lcpg = d2.LcpGraph() lcpg.load(lcpg_name) lcpg.load(lcpg_name) # Algorithms for reduction # Algorithms for reduction ... ...
 ... @@ -4,7 +4,7 @@ import networkx as nx ... @@ -4,7 +4,7 @@ import networkx as nx import argparse import argparse import sys import sys from deconvolution.d2graph import d2_graph as d2, path_optimization as po from deconvolution.lcpgraph import lcp_graph as d2, path_optimization as po def parse_arguments(): def parse_arguments(): ... @@ -31,7 +31,7 @@ def main(): ... @@ -31,7 +31,7 @@ def main(): exit(1) exit(1) # Loading # Loading lcpg = d2.D2Graph() lcpg = d2.LcpGraph() lcpg.load(lcpg_name) lcpg.load(lcpg_name) # Take the principal component # Take the principal component ... ...
 ... @@ -4,8 +4,8 @@ import networkx as nx ... @@ -4,8 +4,8 @@ import networkx as nx import argparse import argparse import sys import sys from deconvolution.d2graph import d2_graph as d2, path_optimization as po from deconvolution.lcpgraph import lcp_graph as d2, path_optimization as po from deconvolution.d2graph.d2_path import Path from deconvolution.lcpgraph.lcp_path import Path def parse_arguments(): def parse_arguments(): ... @@ -34,7 +34,7 @@ def main(): ... @@ -34,7 +34,7 @@ def main(): # Loading # Loading G = nx.read_gexf(barcode_file) G = nx.read_gexf(barcode_file) d2g = d2.D2Graph(G) d2g = d2.LcpGraph(G) d2g.load(d2_file) d2g.load(d2_file) # Take the principal component # Take the principal component ... ...
 ... @@ -4,7 +4,7 @@ import networkx as nx ... @@ -4,7 +4,7 @@ import networkx as nx import argparse import argparse import sys import sys from deconvolution.d2graph import d2_graph as d2 from deconvolution.lcpgraph import lcp_graph as d2 def parse_arguments(): def parse_arguments(): ... @@ -49,7 +49,7 @@ def main(): ... @@ -49,7 +49,7 @@ def main(): shutil.rmtree(debug_path) shutil.rmtree(debug_path) os.mkdir(debug_path) os.mkdir(debug_path) d2g = d2.D2Graph(debug=debug, debug_path=debug_path) d2g = d2.LcpGraph(debug=debug, debug_path=debug_path) d2g.construct_from_barcodes( d2g.construct_from_barcodes( barcode_graph, barcode_graph, neighbor_threshold=args.edge_divergence_threshold, neighbor_threshold=args.edge_divergence_threshold, ... ...
 ... @@ -3,8 +3,8 @@ from distutils.core import setup ... @@ -3,8 +3,8 @@ from distutils.core import setup setup( setup( name='10X-deconvolve', name='10X-deconvolve', version='0.1dev', version='0.2dev', packages=['deconvolution.d2graph', 'deconvolution.dgraph', 'deconvolution.main', 'experiments', 'tests'], packages=['deconvolution.lcpgraph', 'deconvolution.dgraph', 'deconvolution.main', 'experiments', 'tests'], license='AGPL V3', license='AGPL V3', long_description=open('README.md').read(), long_description=open('README.md').read(), ) )
 ... @@ -3,7 +3,7 @@ import tempfile ... @@ -3,7 +3,7 @@ import tempfile import networkx as nx import networkx as nx from scipy.special import comb from scipy.special import comb from deconvolution.d2graph.d2_graph import D2Graph from deconvolution.lcpgraph.lcp_graph import LcpGraph from deconvolution.dgraph import graph_manipulator as gm from deconvolution.dgraph import graph_manipulator as gm ... @@ -14,18 +14,18 @@ class TestD2Graph(unittest.TestCase): ... @@ -14,18 +14,18 @@ class TestD2Graph(unittest.TestCase): # size = 2 * d + 3 # size = 2 * d + 3 # # # G = gm.generate_d_graph_chain(size, d) # G = gm.generate_d_graph_chain(size, d) # d2 = D2Graph(G) # d2 = LcpGraph(G) # print("before", d) # print("before", d) # d2.construct_from_barcodes(neighbor_threshold=0, min_size_clique=d, verbose=False) # d2.construct_from_barcodes(neighbor_threshold=0, min_size_clique=d, verbose=False) # print("after", d) # print("after", d) # # # # for dg in d2.all_d_graphs: # # for dg in d2.all_lcp: # # print(dg.score, dg.get_link_divergence(), dg) # # print(dg.score, dg.get_link_divergence(), dg) # # print() # # print() # # # # Test the number of d-graphs # # Test the number of d-graphs # awaited_d_num = size - 2 * d # awaited_d_num = size - 2 * d # self.assertEqual(awaited_d_num, len(d2.all_d_graphs)) # self.assertEqual(awaited_d_num, len(d2.all_lcp)) # # # # Test connectivity # # Test connectivity # # Center node names # # Center node names ... @@ -48,14 +48,14 @@ class TestD2Graph(unittest.TestCase): ... @@ -48,14 +48,14 @@ class TestD2Graph(unittest.TestCase): def test_no_variability(self): def test_no_variability(self): barcode_graph = nx.read_gexf("test_data/bar_1000_5_2.gexf") barcode_graph = nx.read_gexf("test_data/bar_1000_5_2.gexf") d2 = D2Graph(barcode_graph) d2 = LcpGraph(barcode_graph) d2.construct_from_barcodes() d2.construct_from_barcodes() udgs = d2.all_d_graphs udgs = d2.all_lcp for _ in range(5): for _ in range(5): d2 = D2Graph(barcode_graph) d2 = LcpGraph(barcode_graph) d2.construct_from_barcodes() d2.construct_from_barcodes() self.assertEqual(len(udgs), len(d2.all_d_graphs)) self.assertEqual(len(udgs), len(d2.all_lcp)) def test_reloading(self): def test_reloading(self): ... @@ -65,7 +65,7 @@ class TestD2Graph(unittest.TestCase): ... @@ -65,7 +65,7 @@ class TestD2Graph(unittest.TestCase): # Create a d2 graph # Create a d2 graph G = gm.generate_d_graph_chain(size, d) G = gm.generate_d_graph_chain(size, d) d2 = D2Graph(G) d2 = LcpGraph(G) d2.construct_from_barcodes(verbose=False) d2.construct_from_barcodes(verbose=False) # Save and reload the d2 in a temporary file # Save and reload the d2 in a temporary file ... @@ -74,7 +74,7 @@ class TestD2Graph(unittest.TestCase): ... @@ -74,7 +74,7 @@ class TestD2Graph(unittest.TestCase): nx.write_gexf(d2, fp.name) nx.write_gexf(d2, fp.name) # Reload # Reload d2_reloaded = D2Graph(G) d2_reloaded = LcpGraph(G) d2_reloaded.load(fp.name) d2_reloaded.load(fp.name) # Test the nx graph # Test the nx graph ... @@ -84,11 +84,11 @@ class TestD2Graph(unittest.TestCase): ... @@ -84,11 +84,11 @@ class TestD2Graph(unittest.TestCase): # TODO: Verify distances # TODO: Verify distances # Test all_d_graphs # Test all_lcp self.assertEqual(len(d2_reloaded.all_d_graphs), len(d2.all_d_graphs)) self.assertEqual(len(d2_reloaded.all_lcp), len(d2.all_lcp)) # Verify dg idxs # Verify dg idxs reloaded_idxs = [dg.idx for dg in d2_reloaded.all_d_graphs] reloaded_idxs = [dg.idx for dg in d2_reloaded.all_lcp] for dg in d2.all_d_graphs: for dg in d2.all_lcp: self.assertTrue(dg.idx in reloaded_idxs) self.assertTrue(dg.idx in reloaded_idxs) ... ...