Commit 9adec63c by Yoann Dufresne

### unitigs to graph in progress

parent a652cd40
 ... ... @@ -131,8 +131,8 @@ def compute_unitigs(d2g): @return The constructed unitig """ def compute_unitig_from(d2g, node): unitig = Unitig() unitig.add_udgs([node]) unitig = Unitig(d2g) unitig.add_path([node]) if d2g.degree(str(node.idx)) == 2: left, right = d2g.neighbors(str(node.idx)) else: ... ...
 ... ... @@ -5,12 +5,13 @@ import networkx as nx """ class Path(list): def __init__(self): def __init__(self, d2g): super(Path, self).__init__() self.penalty = 0 self.d2g = d2g self.covering_variables = {x: 0 for x in self.d2g.barcode_edge_idxs.values()} def add_udgs(self, udgs): def add_path(self, udgs): if len(udgs) == 0: return ... ... @@ -24,14 +25,16 @@ class Path(list): # Add udg one by one for udg in udgs: # Compute distance dist = udg.distance_to(self[-1]) dist = self.d2g[str(udg.idx)][str(self[-1].idx)]["distance"] # Update penalty regarding distance self.penalty += pow(dist, 2) # Add the node self.append(udg) def add_path(self, path): self.add_udgs(path.udgs) # Register edges for barcode_edge in udg.edges: edge_idx = self.d2g.barcode_edge_idxs[barcode_edge] self.covering_variables[edge_idx] += 1 def revert(self): self.reverse() ... ... @@ -39,14 +42,35 @@ class Path(list): def normalized_penalty(self): return self.penalty / len(self) def covering_score(self): covered_num = 0 for val in self.covering_variables.values(): if val > 0: covered_num += 1 return covered_num / len(self.covering_variables) """ Count the number of variable covered by path that are not self covered. """ def covering_difference(self, path): self_covered = [var for var, num in self.covering_variables.items() if num > 0] self_covered = frozenset(self_covered) num_covered = 0 for var, val in path.covering_variables.items(): if val > 0 and var not in self_covered: num_covered += 1 return num_covered class Unitig(Path): def __init__(self): super(Unitig, self).__init__() def __init__(self, d2g): super(Unitig, self).__init__(d2g) def add_right(self, udg): self.add_udgs([udg]) self.add_path([udg])
 ... ... @@ -5,6 +5,7 @@ import argparse import sys import d2_graph as d2 import path_algorithms as pa from d2_algorithms import compute_unitigs ... ... @@ -38,8 +39,8 @@ def main(): largest_component = d2g.subgraph(largest_component_nodes) unitigs = compute_unitigs(largest_component) for ut in unitigs: print(ut.normalized_penalty(), len(ut)) path = pa.construct_path_from_unitigs(unitigs, largest_component) print(path.covering_score()) # Write the simplified graph # nx.write_gexf(d2g.nx_graph, args.output_d2_name) ... ...
 import networkx as nx from d2_path import Path """ Greedy algorithm. Start with th most probable unitig (ie lowest normalized penalty first and largest unitig for equalities). Then extends on both side to the nearest interesting unitig. an interesting unitig is a unitig that add new covering variables. """ def construct_path_from_unitigs(unitigs, d2g): # Initiate covering variables used_variables = {x: False for x in d2g.barcode_edge_idxs} # Initiate the path with longest sure unitig path = Path(d2g) first_utg = unitigs.pop(0) path.add_path(first_utg) print(len(path), path.covering_score()) # While it's possible, search to add unitigs while len(unitigs) > 0: # Search for next unitig path_extension = _search_way_to_next_unitig(path, unitigs, d2g) if path_extension is None: break # TODO: Search for unitigs on the other side of the path # Add the unitig to the path path.add_path(path_extension) print(len(path), path.covering_score()) unitigs = [utg for utg in unitigs if path.covering_difference(utg) > 0] print() return path """ Look for the next unitig to join from right endpoint of the path @param path Currently created path @param unitigs List of unitigs of interest (be careful to fill this list with unitigs that add new covered variables @param d2g The d2g to use as path support @return (path_to, utg) Respectively the path to the next unitig of interest (from the right of the current path) and the corresponding unitig (linked to the path from left to right) """ def _search_way_to_next_unitig(path, unitigs, d2g): # Register endpoints endpoints = {} for utg in unitigs: endpoints[utg[0]] = utg if len(utg) > 1: endpoints[utg[-1]] = utg # Search for next endpoint best_paths = _search_endpoint(path[-1], endpoints, d2g, path) if len(best_paths) == 0: return None # Search for the best path (unitig included) to add (The one that cover the most new variables) the_best_path = None covered_variables = 0 for extension in best_paths: utg = endpoints[extension[-1]] # if the utg is in the wrong size if utg[-1] == path[-1]: utg.reverse() complete_path = Path(d2g) complete_path.add_path(extension[1:]) complete_path.add_path(utg[1:]) num_new_vars = path.covering_difference(complete_path) if num_new_vars > covered_variables: covered_variables = num_new_vars the_best_path = complete_path return the_best_path """ Search the min-penalty paths from a start udg to any of the target nodes. @param start_udg The start node of the path @param targets A list of udg of interest @param d2g The d2g to follow @param forbidden_udgs excluded nodes from the d2g @return A list of equivalent paths """ def _search_endpoint(start_udg, targets, d2g, forbidden_udgs): marked_nodes = {x: (None, None) for x in forbidden_udgs} # Init Dijkstra to_explore = [start_udg] marked_nodes[start_udg] = (0, None) found = [] # Dijkstra part 1, compute penalties and previous nodes while len(to_explore) > 0 and len(found) == 0: # Select min penalty in to_explore current = min(to_explore, key=lambda x: marked_nodes[x][0]) current_penalty = marked_nodes[current][0] neighbors = d2g.neighbors(str(current.idx)) # Explore all the neighbors of the current node. for nei_idx in neighbors: nei_udg = d2g.node_by_idx[int(nei_idx)] penalty = current_penalty + d2g[nei_idx][str(current.idx)]["distance"] if nei_udg in marked_nodes: # Forbidden node if marked_nodes[nei_udg][0] is None: continue # Already inserted but update the min penalty to reach the udg if penalty < marked_nodes[nei_udg][0]: marked_nodes[nei_udg] = penalty, current else: # First encounter: insert to explore and set min penalty marked_nodes[nei_udg] = (penalty, current) to_explore.append(nei_udg) # Set stop if a target is found if nei_udg in targets: if len(found) == 0: found.append(nei_udg) elif marked_nodes[found[0]][0] == penalty: found.append(nei_udg) elif marked_nodes[found[0]][0] > penalty: found = [nei_udg] # Dijkstra part 2, retrieve the best paths paths = [] for target in found: # Start a path node_path = [target] # Add all nodes one by one to the first while node_path[-1] != start_udg: node_path.append(marked_nodes[node_path[-1]][1]) # Reverse the path to be on the right order node_path.reverse() # Create and add a real path object path = Path(d2g) path.add_path(node_path) paths.append(path) return paths
