import networkx as nx import itertools from bidict import bidict import sys # from deconvolution.dgraph.FixedDGIndex import FixedDGIndex from deconvolution.dgraph.VariableDGIndex import VariableDGIndex from deconvolution.dgraph.d_graph import Dgraph from deconvolution.dgraph.CliqueDGFactory import CliqueDGFactory from deconvolution.dgraph.LouvainDGFactory import LouvainDGFactory class D2Graph(nx.Graph): """D2Graph (read it (d-graph)²)""" def __init__(self, debug=False, debug_path='.'): super(D2Graph, self).__init__() self.all_d_graphs = [] self.d_graphs_per_node = {} self.node_by_idx = {} self.barcode_graph = None self.index = None self.variables = set() self.variables_per_lcp = {} self.barcode_edge_idxs = {} self.nb_uniq_edge = 0 self.debug = debug self.debug_path = debug_path """ Redefine subgraph to avoid errors type instantiation errors. """ def subgraph(self, nodes): nodes = frozenset(nodes) G = D2Graph(self.barcode_graph) G.barcode_edge_idxs = self.barcode_edge_idxs # Add sub-nodes for node in nodes: G.add_node(node) G.nodes[node].update(self.nodes[node]) # Add edges for node1, node2, data in self.edges(data=True): if node1 in nodes and node2 in nodes: G.add_edge(node1, node2, distance=data["distance"]) # Node by idx G.node_by_idx = self.node_by_idx G.variables = self.variables.copy() return G def clone(self): return self.subgraph(list(self.nodes())) def construct_from_barcodes(self, barcode_graph, neighbor_threshold=0.25, min_size_clique=4, verbose=False, clique_mode=None, threads=1): self.barcode_graph = barcode_graph # Number the edges from original graph for idx, edge in enumerate(self.barcode_graph.edges()): if edge == (edge[1], edge[0]): self.nb_uniq_edge += 1 if edge in self.barcode_edge_idxs: print("Edge already present") self.barcode_edge_idxs[edge] = idx self.barcode_edge_idxs[(edge[1], edge[0])] = idx self.barcode_graph[edge[0]][edge[1]]["uniq_idx"] = idx # Compute all the d-graphs if verbose: print("Computing the unit d-graphs..") dg_factory = None if clique_mode == "louvain": dg_factory = LouvainDGFactory(self.barcode_graph) else: dg_factory = CliqueDGFactory(self.barcode_graph, min_size_clique=min_size_clique, debug=self.debug, debug_path=self.debug_path) self.d_graphs_per_node = dg_factory.generate_all_dgraphs(threads=threads, verbose=verbose) if verbose: counts = sum(len(x) for x in self.d_graphs_per_node.values()) print(f"\t {counts} computed d-graphs") for d_graphs in self.d_graphs_per_node.values(): self.all_d_graphs.extend(d_graphs) # Number the d_graphs for idx, d_graph in enumerate(self.all_d_graphs): d_graph.idx = idx self.node_by_idx[idx] = d_graph if verbose: print("Compute the graph") # Create the graph self.bidict_nodes = self.create_graph_from_node_neighborhoods(neighbor_threshold) def get_lcp(self, obj): if type(obj) == str: obj = int(obj) if type(obj) == int: obj = self.node_by_idx[obj] return obj def get_covering_variables(self, obj): lcp = self.get_lcp(obj) if lcp not in self.variables_per_lcp: variables = [] for e_idx in lcp.edges: variables.append(e_idx) self.variables_per_lcp[lcp] = variables return self.variables_per_lcp[lcp] def load(self, filename): # Reload the graph G = nx.read_gexf(filename) for node, attrs in G.nodes(data=True): self.add_node(node) self.nodes[node].update(attrs) for edge in G.edges(data=True): self.add_edge(edge[0], edge[1], distance=edge[2]["distance"]) # Extract d-graphs from nx graph # self.node_by_name = {} self.bidict_nodes = {} for idx, node in enumerate(self.nodes(data=True)): node, data = node dg = Dgraph.load(data["udg"], data["score"], data["barcode_edges"]) self.variables.update(dg.edges) self.bidict_nodes[node] = dg self.all_d_graphs.append(dg) if dg.idx == -1: dg.idx = int(node) self.node_by_idx[dg.idx] = dg # self.node_by_name[node] = dg self.bidict_nodes = bidict(self.bidict_nodes) def create_index_from_tuples(self, tuple_size=3, verbose=True): index = {} if verbose: print("\tIndex d-graphs") for lst_idx, dg in enumerate(self.all_d_graphs): if verbose: sys.stdout.write(f"\r\t{lst_idx+1}/{len(self.all_d_graphs)}") sys.stdout.flush() nodelist = dg.to_sorted_list() if len(nodelist) < tuple_size: continue # Generate all tuplesize-mers for dmer in itertools.combinations(nodelist, tuple_size): if dmer not in index: index[dmer] = set() index[dmer].add(dg) if verbose: print() return index def create_graph_from_node_neighborhoods(self, neighborhood_threshold=0.25): nodes = {} # Create the nodes of d2g from udgs for dg in self.all_d_graphs: nodes[dg] = dg.idx self.add_node(nodes[dg]) # Add covering barcode edges barcode_edges = " ".join([str(x) for x in dg.edges]) self.nodes[nodes[dg]]["barcode_edges"] = barcode_edges self.nodes[nodes[dg]]["score"] = f"{dg.score}/{dg.get_optimal_score()}" self.nodes[nodes[dg]]["udg"] = str(dg) self.nodes[nodes[dg]]["central_node_barcode"] = str(dg.center) # Create the edges from neighbor edges for dg in self.all_d_graphs: for node in dg.to_node_set(): if node == dg.center: continue entry = frozenset({node}) if entry in self.d_graphs_per_node: colliding_dgs = self.d_graphs_per_node[entry] for colliding_dg in colliding_dgs: distance = dg.distance_to(colliding_dg) distance_ratio = distance / (len(dg.nodes) + len(colliding_dg.nodes)) if distance_ratio <= neighborhood_threshold: self.add_edge(nodes[dg], nodes[colliding_dg], distance=distance) # Filter out singletons graph_nodes = list(nodes) for n in graph_nodes: if len(list(self.neighbors(nodes[n]))) == 0: self.remove_node(nodes[n]) del nodes[n] return bidict(nodes) def create_graph_from_index(self): nodes = {} for dmer in self.index: dgs = list(set(self.index[dmer])) for d_idx, dg in enumerate(dgs): # Create a node name if dg not in nodes: nodes[dg] = dg.idx # Add the node self.add_node(nodes[dg]) # Add covering barcode edges barcode_edges = " ".join([str(self.barcode_edge_idxs[x]) for x in dg.edges]) self.nodes[nodes[dg]]["barcode_edges"] = barcode_edges self.nodes[nodes[dg]]["score"] = f"{dg.score}/{dg.get_optimal_score()}" self.nodes[nodes[dg]]["udg"] = str(dg) # Add the edges for prev_idx in range(d_idx): prev_dg = dgs[prev_idx] # Add on small distances d = dg.distance_to(prev_dg) if d <= min(len(dg.node_set)/2, len(prev_dg.node_set)/2): self.add_edge(nodes[dg], nodes[prev_dg], distance=d) return bidict(nodes) def compute_distances(self): for x, y, data in self.edges(data=True): dg1 = self.node_by_idx[x] dg2 = self.node_by_idx[y] if dg1 == dg2: continue # Distance computing and adding in the dist dicts d = dg1.distance_to(dg2) data["distance"] = d