Commit cf3ae74c by Yoann Dufresne

### save d2_graph

parent 0d701c92
 import networkx as nx import itertools from bidict import bidict from d_graph import compute_all_max_d_graphs ... ... @@ -13,7 +14,24 @@ class D2Graph(object): # Compute all the d-graphs if verbose: print("Compute the unit d-graphs") self.d_graphs = compute_all_max_d_graphs(self.graph) self.d_graphs_per_node = compute_all_max_d_graphs(self.graph) self.all_d_graphs = [] for d_graphs in self.d_graphs_per_node.values(): self.all_d_graphs.extend(d_graphs) # Name the d-graphs # Number the d_graphs for idx, d_graph in enumerate(self.all_d_graphs): d_graph.idx = idx # Number the edges from original graph self.edge_idxs = {} self.nb_uniq_edge = 0 for idx, edge in enumerate(self.graph.edges()): if edge == (edge[1], edge[0]): self.nb_uniq_edge += 1 self.edge_idxs[edge] = idx self.edge_idxs[(edge[1], edge[0])] = idx # Index all the d-graphes if verbose: ... ... @@ -25,59 +43,58 @@ class D2Graph(object): self.distances = self.compute_distances() # Create the graph self.graph, self.nodes = self.to_nx_graph() self.nx_graph, self.nodes = self.to_nx_graph() def save(self, filename): with open(filename, "w") as fp: # First line nb_nodes nb_cov_var fp.write(f"{len(self.all_d_graphs)} {int((len(self.edge_idxs)+self.nb_uniq_edge)/2)}\n") # Write the edges per d_graph for d_graph in self.all_d_graphs: fp.write(f"{d_graph.idx} {' '.join([str(self.edge_idxs[e]) for e in d_graph.edges])}\n") # Write the distances for d_graph in self.all_d_graphs: for neighbor_idx, dist in self.distances[d_graph.idx].items(): fp.write(f"{d_graph.idx} {neighbor_idx} {dist}\n") def create_index_from_tuples(self, tuple_size=3): index = {} perfect = 0 for node in self.d_graphs: for dg in self.d_graphs[node]: nodelist = dg.to_list() nodelist.sort() if len(nodelist) < tuple_size: continue # Generate all tuplesize-mers for dmer in itertools.combinations(nodelist, tuple_size): if not dmer in index: index[dmer] = [dg] else: index[dmer].append(dg) for dg in self.all_d_graphs: nodelist = dg.to_list() nodelist.sort() if len(nodelist) < tuple_size: continue # Generate all tuplesize-mers for dmer in itertools.combinations(nodelist, tuple_size): if not dmer in index: index[dmer] = [dg] else: index[dmer].append(dg) return index def compute_distances(self): distances = {} distances = {dg.idx:{} for dg in self.all_d_graphs} for dmer, dgraphs in self.index.items(): if len(dgraphs) == 1: continue for idx1, dg1 in enumerate(dgraphs): # Add dist dict for dg1 if not dg1 in distances: distances[dg1] = {} for idx2 in range(idx1+1, len(dgraphs)): dg2 = dgraphs[idx2] if dg1 == dg2: continue # Add dist dict for dg2 if not dg2 in distances: distances[dg2] = {} # Distance computing and adding in the dist dicts d = dg1.distance_to(dg2) distances[dg1][dg2] = d distances[dg2][dg1] = d if len(distances[dg1]) == 0: del distances[dg1] distances[dg1.idx][dg2.idx] = d distances[dg2.idx][dg1.idx] = d return distances ... ... @@ -86,8 +103,8 @@ class D2Graph(object): index = {} perfect = 0 for node in self.d_graphs: for dg in self.d_graphs[node]: for node in self.d_graphs_per_node: for dg in self.d_graphs_per_node[node]: lst = dg.to_ordered_lists() # Generate all dmers without the first node # pull all the values ... ... @@ -133,7 +150,7 @@ class D2Graph(object): for prev_node in self.index[dmer][:d_idx]: G.add_edge(nodes[dg], nodes[prev_node]) return G, nodes return G, bidict(nodes) \ No newline at end of file
 import networkx as nx class Path(object): ... ... @@ -6,9 +8,86 @@ class Path(object): self.nodes = [] def add_nodes(self, nodes): self.nodes.extend(nodes) def add_path(self, path): self.add_nodes(path.nodes) def get_score(self): return 0 # class D2_Path_finder(object): def __repr__(self): return f"[{','.join([str(x) for x in self.nodes])}]" def save_d2(self, d2g, file): if len(self.nodes) == 0: return graph_path = nx.Graph() graph_path.add_node(self.nodes[0]) for idx, node in enumerate(self.nodes[1:]): dg1 = d2g.nodes.inverse[node] graph_path.add_node(node) prev_node = self.nodes[idx] dg2 = d2g.nodes.inverse[prev_node] if dg1 in d2g.distances[dg2]: graph_path.add_edge(node, prev_node) nx.write_gexf(graph_path, file) class Unitig(Path): def __init__(self, d2g, node_list): super(Unitig, self).__init__() self.nodes = node_list self.d2g = d2g def add_left(self, node): self.nodes.insert(0,node) def add_right(self, node): self.nodes.append(node) def get_original_edges(self): edges = [] for node in self.nodes: dg = self.d2g.nodes.inverse[node] edges.extend(dg.edges) return frozenset(edges) class UnitigGraph(): def __init__(self, d2g, unitigs): self.d2g = dg2 self.unitigs = unitigs def compute_unitig_graph(self): ug = nx.Graph() # Get unitig extremities and index them borders = {} for unitig in self.unitigs: borders.append[unitig.nodes[0]] = unitig if len(unitig.nodes) > 1: borders.append[unitig.nodes[-1]] = unitig border_nodes = frozenset(borders.keys()) # link borders together for unitig in self.unitigs: pass
 ... ... @@ -8,11 +8,13 @@ class Dgraph(object): """docstring for Dgraph""" def __init__(self, center): super(Dgraph, self).__init__() self.idx = -1 self.center = center self.score = 0 self.halves = [None,None] self.connexity = [None,None] self.nodes = [center] self.edges = [] """ Compute the d-graph quality (score) according to the connectivity between the two halves. ... ... @@ -27,6 +29,7 @@ class Dgraph(object): self.nodes = sorted([self.center] + h1 + h2) self.connexity[0] = {key:0 for key in self.halves[0]} self.connexity[1] = {key:0 for key in self.halves[1]} self.edges = [] # Compute link arities for node1 in h1: ... ... @@ -38,6 +41,12 @@ class Dgraph(object): self.connexity[0][node1] += 1 self.connexity[1][node2] += 1 if node1 < node2: self.edges.append((node1, node2)) elif node2 < node1: self.edges.append((node2, node1)) # Sort the halves by descending connexity connex = self.connexity self.halves[0].sort(reverse=True, key=lambda v: connex[0][v]) ... ...
 ... ... @@ -9,7 +9,7 @@ import itertools import d_graph as dg import d2_graph as d2 from d2_algorithms import greedy_reduct, filter_singeltons, compute_unitigs from d2_algorithms import greedy_reduct, filter_singeltons, compute_unitigs, compute_path_from_unitigs def main(): # Parsing the input file ... ... @@ -21,18 +21,20 @@ def main(): G = nx.read_gexf(filename) d2g = d2.D2Graph(G) d2g.save("data/optimization.tsv") G, names = d2g.to_nx_graph() nx.write_gexf(G, "data/d2_graph.gexf") print("Greedy reduction of the graph") greedy = filter_singeltons(greedy_reduct(d2g)) nx.write_gexf(greedy, "data/d2_graph_greedy.gexf") print("Compute unitigs from greedy reducted graph") unitigs = compute_unitigs(greedy) print("\n".join([str(x) for x in unitigs])) print(len(unitigs)) unitigs = compute_unitigs(greedy, d2g) # Compute greedy complete path from unitigs regarding most efficient path between them path = compute_path_from_unitigs(d2g, unitigs) path.save_d2(d2g, "data/d2_greedy_path.gexf") if __name__ == "__main__": ... ...
 networkx>=2.2 termcolor>=1.1 \ No newline at end of file termcolor>=1.1 bidict>=0.18 \ No newline at end of file
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!