import networkx as nx from collections import Counter import sys """ Represent an udg path into a d2g graph """ class Path(list): def __init__(self, d2g): super(Path, self).__init__() self.d2g = d2g self.covering_variables = {x: 0 for x in self.d2g.variables} self.covering_value = 0 # a succession of Counter (multiset) self.barcode_order = [] self.lcp_per_multiset = [] self.barcode_score = 0 self.size_score = 0 def append(self, obj) -> None: lcp = self.d2g.get_lcp(obj) # Update the covering variables for edge_idx in lcp.edges: if self.covering_variables[edge_idx] == 0: self.covering_value += 1 self.covering_variables[edge_idx] += 1 self._append_barcodes(lcp) super(Path, self).append(lcp) def _append_barcodes(self, lcp): set_idx = len(self.barcode_order) - 1 remaining_barcodes = Counter(lcp.nodes) while set_idx >= 0: current_multiset = self.barcode_order[set_idx] intersection = remaining_barcodes & current_multiset if intersection == current_multiset: remaining_barcodes -= intersection self.lcp_per_multiset[set_idx].add(lcp) self.barcode_score += 1 else: if len(intersection) == 0: break # Split the multiset in two parts left = current_multiset - intersection right = intersection self.barcode_order[set_idx] = right self.barcode_order.insert(set_idx, left) # Split the lcp appearance self.lcp_per_multiset.insert(set_idx, self.lcp_per_multiset[set_idx].copy()) self.lcp_per_multiset[set_idx+1].add(lcp) self.barcode_score += len(self.lcp_per_multiset[set_idx+1]) # Update remaining barcodes remaining_barcodes -= intersection break set_idx -= 1 if len(remaining_barcodes) > 0: self.barcode_order.append(remaining_barcodes) self.lcp_per_multiset.append({lcp}) self.barcode_score += 1 def pop(self, index=-1): if index != -1: print("Warning: pop on other values than -1 have side effects here. The code will be adapted soon", file=sys.stderr) exit(1) index = -1 lcp = super(Path, self).pop(index) self._pop_barcodes(lcp) # Update the covering variables for e_idx in lcp.edges: self.covering_variables[e_idx] -= 1 if self.covering_variables[e_idx] == 0: self.covering_value -= 1 return lcp def _pop_barcodes(self, lcp): set_idx = len(self.barcode_order) - 1 while set_idx >= 0 and lcp in self.lcp_per_multiset[set_idx]: lcp_list = self.lcp_per_multiset[set_idx] lcp_list.remove(lcp) self.barcode_score -= 1 if len(lcp_list) == 0: self.lcp_per_multiset.pop(set_idx) self.barcode_order.pop(set_idx) set_idx -= 1 if 0 <= set_idx < len(self.lcp_per_multiset) - 1 and self.lcp_per_multiset[set_idx] == self.lcp_per_multiset[set_idx+1]: rmv = self.lcp_per_multiset.pop(set_idx) self.barcode_score -= len(rmv) self.barcode_order[set_idx+1] += self.barcode_order[set_idx] self.barcode_order.pop(set_idx) def copy(self): copy = Path(self.d2g) # Copy the list for x in self: super(Path, copy).append(x) # Copy the variables for key, val in self.covering_variables.items(): copy.covering_variables[key] = val copy.covering_value = self.covering_value # Copy barcode structures for lcp_list in self.lcp_per_multiset: copy.lcp_per_multiset.append(lcp_list.copy()) for barcodes in self.barcode_order: copy.barcode_order.append(barcodes.copy()) copy.barcode_score = self.barcode_score return copy def add_path(self, path): for lcp in path: self.append(lcp) def covering_score(self): return self.covering_value / len(self.covering_variables) def save_gexf(self, filename): d2p = nx.Graph() # Add the nodes for udg in self: d2p.add_node(udg.idx) d2p.nodes[udg.idx]["center"] = udg.center d2p.nodes[udg.idx]["udg"] = str(udg) d2p.nodes[udg.idx]["score"] = f"{udg.score}/{udg.get_optimal_score()}" barcode_edges = " ".join([str(x) for x in udg.edges]) d2p.nodes[udg.idx]["barcode_edges"] = barcode_edges # add the edges for idx in range(len(self)-1): udg1 = self[idx] udg2 = self[idx+1] d2p.add_edge(udg1.idx, udg2.idx) nx.write_gexf(d2p, filename)