Select Git revision
d2_path.py 5.14 KiB
import networkx as nx
from collections import Counter
import sys
""" Represent an udg path into a d2g graph
"""
class Path(list):
def __init__(self, d2g):
super(Path, self).__init__()
self.d2g = d2g
self.covering_variables = {x: 0 for x in self.d2g.barcode_edge_idxs.values()}
self.covering_value = 0
# a succession of Counter (multiset)
self.barcode_order = []
self.lcp_per_multiset = []
self.barcode_score = 0
self.size_score = 0
def append(self, obj) -> None:
lcp = self.d2g.get_lcp(obj)
# Update the covering variables
for barcode_edge in lcp.edges:
edge_idx = self.d2g.barcode_edge_idxs[barcode_edge]
if self.covering_variables[edge_idx] == 0:
self.covering_value += 1
self.covering_variables[edge_idx] += 1
self._append_barcodes(lcp)
super(Path, self).append(lcp)
def _append_barcodes(self, lcp):
set_idx = len(self.barcode_order) - 1
remaining_barcodes = Counter(lcp.nodes)
while set_idx >= 0:
current_multiset = self.barcode_order[set_idx]
intersection = remaining_barcodes & current_multiset
if intersection == current_multiset:
remaining_barcodes -= intersection
self.lcp_per_multiset[set_idx].add(lcp)
self.barcode_score += 1
else:
if len(intersection) == 0:
break
# Split the multiset in two parts
left = current_multiset - intersection
right = intersection
self.barcode_order[set_idx] = right
self.barcode_order.insert(set_idx, left)
# Split the lcp appearance
self.lcp_per_multiset.insert(set_idx, self.lcp_per_multiset[set_idx].copy())
self.lcp_per_multiset[set_idx+1].add(lcp)
self.barcode_score += len(self.lcp_per_multiset[set_idx+1])
# Update remaining barcodes
remaining_barcodes -= intersection
break
set_idx -= 1
if len(remaining_barcodes) > 0:
self.barcode_order.append(remaining_barcodes)
self.lcp_per_multiset.append({lcp})
self.barcode_score += 1
def pop(self, index=-1):
if index != -1:
print("Warning: pop on other values than -1 have side effects here. The code will be adapted soon", file=sys.stderr)
exit(1)
index = -1
lcp = super(Path, self).pop(index)
self._pop_barcodes(lcp)
# Update the covering variables
for barcode_edge in lcp.edges:
edge_idx = self.d2g.barcode_edge_idxs[barcode_edge]
self.covering_variables[edge_idx] -= 1
if self.covering_variables[edge_idx] == 0:
self.covering_value -= 1
return lcp
def _pop_barcodes(self, lcp):
set_idx = len(self.barcode_order) - 1
while set_idx >= 0 and lcp in self.lcp_per_multiset[set_idx]:
lcp_list = self.lcp_per_multiset[set_idx]
lcp_list.remove(lcp)
self.barcode_score -= 1
if len(lcp_list) == 0:
self.lcp_per_multiset.pop(set_idx)
self.barcode_order.pop(set_idx)
set_idx -= 1
if 0 <= set_idx < len(self.lcp_per_multiset) - 1 and self.lcp_per_multiset[set_idx] == self.lcp_per_multiset[set_idx+1]:
rmv = self.lcp_per_multiset.pop(set_idx)
self.barcode_score -= len(rmv)
self.barcode_order[set_idx+1] += self.barcode_order[set_idx]
self.barcode_order.pop(set_idx)
def copy(self):
copy = Path(self.d2g)
# Copy the list
for x in self:
super(Path, copy).append(x)
# Copy the variables
for key, val in self.covering_variables.items():
copy.covering_variables[key] = val
copy.covering_value = self.covering_value
# Copy barcode structures
for lcp_list in self.lcp_per_multiset:
copy.lcp_per_multiset.append(lcp_list.copy())
for barcodes in self.barcode_order:
copy.barcode_order.append(barcodes.copy())
copy.barcode_score = self.barcode_score
return copy
def add_path(self, path):
for lcp in path:
self.append(lcp)
def covering_score(self):
return self.covering_value / len(self.covering_variables)
def save_gexf(self, filename):
d2p = nx.Graph()
# Add the nodes
for udg in self:
d2p.add_node(udg.idx)
d2p.nodes[udg.idx]["center"] = udg.center
d2p.nodes[udg.idx]["udg"] = str(udg)
d2p.nodes[udg.idx]["score"] = f"{udg.score}/{udg.get_optimal_score()}"
barcode_edges = " ".join([str(self.d2g.barcode_edge_idxs[x]) for x in udg.edges])
d2p.nodes[udg.idx]["barcode_edges"] = barcode_edges
# add the edges
for idx in range(len(self)-1):
udg1 = self[idx]
udg2 = self[idx+1]
d2p.add_edge(udg1.idx, udg2.idx)
nx.write_gexf(d2p, filename)