Commit e7f0a973 by Yoann Dufresne

### unitigs generation ok

parent 717a43e4
 ... ... @@ -8,7 +8,7 @@ from d2_path import Path, Unitig """ def transitive_reduction(d2g): edges = list(d2g.edges(data=True)) # Remove self edges for edge in edges: if edge[0] == edge[1]: ... ... @@ -90,70 +90,74 @@ def filter_singeltons(graph): return graph """ Compute the unambiguous paths in a graph. The graph must not contain singletons.s @param graph a networkx graph """ Compute the unambiguous paths in a d2g. The d2g must not contain singletons. @param d2g a d2g graph @return a list of unitigs """ def compute_unitigs(graph): def compute_unitigs(d2g): unitigs = [] used_nodes = {node:False for node in graph.nodes()} current_unitig = Unitig() used_nodes = {int(node): False for node in d2g.nodes()} for node_id in d2g.nodes(): node_idx = int(node_id) node = d2g.node_by_idx[node_idx] for node in graph.nodes(): # If already tested if used_nodes[node]: if used_nodes[node.idx]: continue used_nodes[node] = True used_nodes[node.idx] = True if graph.degree(node) > 2: if d2g.degree(str(node.idx)) > 2: # Not a unitig continue # Create new unitigs unitig = compute_unitig_from(graph, node) unitig = compute_unitig_from(d2g, node) unitigs.append(unitig) for node in unitig: used_nodes[node] = True used_nodes[node.idx] = True return unitigs """ Compute a unitig starting from the node regarding the graph. The unitig will be extended on both sides @param graph The graph to look for unitig @param d2g The d2g to look for unitig @param node The start node of the unitig @return The constructed unitig """ def compute_unitig_from(graph, node): unitig = Unitig(nodes=[node]) if graph.degree(node) == 2: left, right = graph.neighbors(node) def compute_unitig_from(d2g, node): unitig = Unitig(udgs=[node]) if d2g.degree(str(node.idx)) == 2: left, right = d2g.neighbors(str(node.idx)) else: left = next(graph.neighbors(node)) left = next(d2g.neighbors(str(node.idx))) right = None # Extends first side prev_node = node current_node = left unitig = extend_unitig(unitig, graph, prev_node, current_node) current_node = d2g.node_by_idx[int(left)] unitig = extend_unitig(unitig, d2g, prev_node, current_node) unitig.revert() # Extends second side prev_node = node current_node = right unitig = extend_unitig(unitig, graph, prev_node, current_node) current_node = d2g.node_by_idx[int(right)] if right != None else None unitig = extend_unitig(unitig, d2g, prev_node, current_node) return unitig """ Directional extension of the unitig. Uses the previous and current nodes to detect the extension direction. @param unitig The unitig to extend. Will be modified during the execution. @param graph The graph to follow for extension @param d2g The d2g to follow for extension @param prev_node Node already added in the unitig @param current_node Node to add into the unitig and used to select the next node to add. If not set, stop the extension. @return Return the modified unitig. """ def extend_unitig(unitig, graph, prev_node, current_node): def extend_unitig(unitig, d2g, prev_node, current_node): if current_node == None: return unitig ... ... @@ -161,10 +165,9 @@ def extend_unitig(unitig, graph, prev_node, current_node): unitig.add_right(current_node) # Look for the next node next_candidates = list(graph.neighbors(current_node)) next_candidates.remove(prev_node) next_candidates = list(d2g.neighbors(str(current_node.idx))) next_candidates.remove(str(prev_node.idx)) # Select next node next_node = next_candidates[0] if len(next_candidates) == 1 else None next_node = d2g.node_by_idx[int(next_candidates[0])] if len(next_candidates) == 1 else None # Continue extension return extend_unitig(unitig, graph, current_node, next_node) return extend_unitig(unitig, d2g, current_node, next_node)
 ... ... @@ -9,6 +9,9 @@ class D2Graph(nx.Graph): """D2Graph (read it (d-graph)²)""" def __init__(self, barcode_graph): super(D2Graph, self).__init__() self.all_d_graphs = [] self.d_graphs_per_node = {} self.node_by_idx = {} self.barcode_graph = barcode_graph # Number the edges from original graph ... ... @@ -26,9 +29,25 @@ class D2Graph(nx.Graph): """ Redefine subgraph to avoid errors type instantiation errors. """ def subgraph(self, nodes): G = nx.Graph() G.add_edges_from(self.edges()) return G.subgraph(nodes) nodes = frozenset(nodes) G = D2Graph(self.barcode_graph) G.barcode_edge_idxs = self.barcode_edge_idxs # Add sub-nodes for node in nodes: G.add_node(node) G.nodes[node].update(self.nodes[node]) # Add edges for node1, node2, data in self.edges(data=True): if node1 in nodes and node2 in nodes: G.add_edge(node1, node2, distance=data["distance"]) # Node by idx G.node_by_idx = self.node_by_idx return G def construct_from_barcodes(self, index_size=3, verbose=True, debug=False): ... ... @@ -37,18 +56,15 @@ class D2Graph(nx.Graph): print("Compute the unit d-graphs") self.d_graphs_per_node = compute_all_max_d_graphs(self.barcode_graph, debug=debug) self.d_graphs_per_node = filter_dominated(self.d_graphs_per_node) self.all_d_graphs = [] for d_graphs in self.d_graphs_per_node.values(): self.all_d_graphs.extend(d_graphs) # Name the d-graphs # Number the d_graphs self.node_by_idx = {} self.node_by_name = {} for idx, d_graph in enumerate(self.all_d_graphs): d_graph.idx = idx self.node_by_idx[idx] = d_graph self.node_by_name[str(d_graph)] = d_graph # self.node_by_name[str(d_graph)] = d_graph # Index all the d-graphes if verbose: ... ... @@ -87,9 +103,7 @@ class D2Graph(nx.Graph): self.add_edge(edge[0], edge[1], distance=edge[2]["distance"]) # Extract d-graphs from nx graph self.all_d_graphs = [] self.node_by_idx = {} self.node_by_name = {} # self.node_by_name = {} self.bidict_nodes = {} for idx, node in enumerate(self.nodes(data=True)): node, data = node ... ... @@ -97,9 +111,9 @@ class D2Graph(nx.Graph): self.bidict_nodes[node] = dg self.all_d_graphs.append(dg) if dg.idx == -1: dg.idx = idx dg.idx = int(node) self.node_by_idx[dg.idx] = dg self.node_by_name[node] = dg # self.node_by_name[node] = dg self.bidict_nodes = bidict(self.bidict_nodes) ... ... @@ -232,4 +246,3 @@ class D2Graph(nx.Graph): del self.index[dmer] \ No newline at end of file
 import networkx as nx """ Represent an udg path into a d2g graph """ class Path(list): def __init__(self, nodes=[]): def __init__(self, udgs=[]): super(Path, self).__init__() self.nodes = [x for x in nodes] self.udgs = [x for x in udgs] def add_nodes(self, nodes): self.nodes.extend(nodes) def add_udgs(self, udgs): self.udgs.extend(udgs) def add_path(self, path): self.add_nodes(path.nodes) self.add_udgs(path.udgs) def revert(self): self.nodes = [x for x in self.nodes[::-1]] self.udgs = [x for x in self.udgs[::-1]] def get_score(self, d2g): return 0 def get_penalty(self, d2g): penalty = 0 for idx, node in enumerate(self.udgs[1:]): prev_node = self.udgs[idx-1] penalty += pow() return penalty def __repr__(self): return f"[{','.join([str(x) for x in self.nodes])}]" return f"[{','.join([str(x) for x in self.udgs])}]" class Unitig(Path): def __init__(self, nodes=[]): super(Unitig, self).__init__(nodes) def __init__(self, udgs=[]): super(Unitig, self).__init__(udgs) def add_left(self, node): self.nodes.insert(0,node) self.udgs.insert(0,node) def add_right(self, node): self.nodes.append(node) self.udgs.append(node)
 ... ... @@ -10,8 +10,8 @@ from d2_algorithms import compute_unitigs def parse_arguments(): parser = argparse.ArgumentParser(description='Greedy construction of a path through the d2 graph.') parser.add_argument('barcode_graph', help='The barcode graph file. Must be a gefx formated file.') parser.add_argument('d2_graph', help='d2 graph to reduce. Must be a gefx formated file.') parser.add_argument('barcode_graph', help='The barcode graph file. Must be a gefx formatted file.') parser.add_argument('d2_graph', help='d2 graph to reduce. Must be a gexf formatted file.') parser.add_argument('--output_path', '-o', default="d2_path.gexf", help="Output file prefix.") args = parser.parse_args() ... ... @@ -38,7 +38,7 @@ def main(): largest_component = d2g.subgraph(largest_component_nodes) unitigs = compute_unitigs(largest_component) # Write the simplificated graph # Write the simplified graph # nx.write_gexf(d2g.nx_graph, args.output_d2_name) ... ...