Skip to content
Snippets Groups Projects
Select Git revision
  • 45503263b302bffdeaeea1b1ed71d7b5e67b437a
  • master default protected
  • dev
  • score_test
4 results

d2_path.py

Blame
  • d2_path.py 5.14 KiB
    import networkx as nx
    from collections import Counter
    import sys
    
    
    """ Represent an udg path into a d2g graph
    """
    class Path(list):
    
        def __init__(self, d2g):
            super(Path, self).__init__()
            self.d2g = d2g
            self.covering_variables = {x: 0 for x in self.d2g.barcode_edge_idxs.values()}
            self.covering_value = 0
            # a succession of Counter (multiset)
            self.barcode_order = []
            self.lcp_per_multiset = []
            self.barcode_score = 0
            self.size_score = 0
    
        def append(self, obj) -> None:
            lcp = self.d2g.get_lcp(obj)
    
            # Update the covering variables
            for barcode_edge in lcp.edges:
                edge_idx = self.d2g.barcode_edge_idxs[barcode_edge]
                if self.covering_variables[edge_idx] == 0:
                    self.covering_value += 1
                self.covering_variables[edge_idx] += 1
    
            self._append_barcodes(lcp)
    
            super(Path, self).append(lcp)
    
        def _append_barcodes(self, lcp):
            set_idx = len(self.barcode_order) - 1
            remaining_barcodes = Counter(lcp.nodes)
    
            while set_idx >= 0:
                current_multiset = self.barcode_order[set_idx]
                intersection = remaining_barcodes & current_multiset
    
                if intersection == current_multiset:
                    remaining_barcodes -= intersection
                    self.lcp_per_multiset[set_idx].add(lcp)
                    self.barcode_score += 1
                else:
                    if len(intersection) == 0:
                        break
                    # Split the multiset in two parts
                    left = current_multiset - intersection
                    right = intersection
                    self.barcode_order[set_idx] = right
                    self.barcode_order.insert(set_idx, left)
                    # Split the lcp appearance
                    self.lcp_per_multiset.insert(set_idx, self.lcp_per_multiset[set_idx].copy())
                    self.lcp_per_multiset[set_idx+1].add(lcp)
                    self.barcode_score += len(self.lcp_per_multiset[set_idx+1])
                    # Update remaining barcodes
                    remaining_barcodes -= intersection
                    break
                set_idx -= 1
    
            if len(remaining_barcodes) > 0:
                self.barcode_order.append(remaining_barcodes)
                self.lcp_per_multiset.append({lcp})
                self.barcode_score += 1
    
        def pop(self, index=-1):
            if index != -1:
                print("Warning: pop on other values than -1 have side effects here. The code will be adapted soon", file=sys.stderr)
                exit(1)
            index = -1
            lcp = super(Path, self).pop(index)
            self._pop_barcodes(lcp)
    
            # Update the covering variables
            for barcode_edge in lcp.edges:
                edge_idx = self.d2g.barcode_edge_idxs[barcode_edge]
                self.covering_variables[edge_idx] -= 1
                if self.covering_variables[edge_idx] == 0:
                    self.covering_value -= 1
    
            return lcp
    
        def _pop_barcodes(self, lcp):
            set_idx = len(self.barcode_order) - 1
    
            while set_idx >= 0 and lcp in self.lcp_per_multiset[set_idx]:
                lcp_list = self.lcp_per_multiset[set_idx]
                lcp_list.remove(lcp)
                self.barcode_score -= 1
                if len(lcp_list) == 0:
                    self.lcp_per_multiset.pop(set_idx)
                    self.barcode_order.pop(set_idx)
                set_idx -= 1
    
            if 0 <= set_idx < len(self.lcp_per_multiset) - 1 and self.lcp_per_multiset[set_idx] == self.lcp_per_multiset[set_idx+1]:
                rmv = self.lcp_per_multiset.pop(set_idx)
                self.barcode_score -= len(rmv)
                self.barcode_order[set_idx+1] += self.barcode_order[set_idx]
                self.barcode_order.pop(set_idx)
    
        def copy(self):
            copy = Path(self.d2g)
    
            # Copy the list
            for x in self:
                super(Path, copy).append(x)
    
            # Copy the variables
            for key, val in self.covering_variables.items():
                copy.covering_variables[key] = val
            copy.covering_value = self.covering_value
    
            # Copy barcode structures
            for lcp_list in self.lcp_per_multiset:
                copy.lcp_per_multiset.append(lcp_list.copy())
            for barcodes in self.barcode_order:
                copy.barcode_order.append(barcodes.copy())
            copy.barcode_score = self.barcode_score
    
            return copy
    
        def add_path(self, path):
            for lcp in path:
                self.append(lcp)
    
        def covering_score(self):
            return self.covering_value / len(self.covering_variables)
    
        def save_gexf(self, filename):
            d2p = nx.Graph()
            # Add the nodes
            for udg in self:
                d2p.add_node(udg.idx)
                d2p.nodes[udg.idx]["center"] = udg.center
                d2p.nodes[udg.idx]["udg"] = str(udg)
                d2p.nodes[udg.idx]["score"] = f"{udg.score}/{udg.get_optimal_score()}"
                barcode_edges = " ".join([str(self.d2g.barcode_edge_idxs[x]) for x in udg.edges])
                d2p.nodes[udg.idx]["barcode_edges"] = barcode_edges
    
            # add the edges
            for idx in range(len(self)-1):
                udg1 = self[idx]
                udg2 = self[idx+1]
    
                d2p.add_edge(udg1.idx, udg2.idx)
    
            nx.write_gexf(d2p, filename)