Skip to content
Snippets Groups Projects
Select Git revision
  • 6618adb455f9122345499ac5c816a02400fcff4d
  • main default protected
2 results

constants.h

Blame
  • d2_path.py 3.38 KiB
    import networkx as nx
    
    
    """ Represent an udg path into a d2g graph
    """
    class Path(list):
    
        def __init__(self, d2g):
            super(Path, self).__init__()
            self.penalty = 0
            self.d2g = d2g
            self.covering_variables = {x: 0 for x in self.d2g.barcode_edge_idxs.values()}
    
        def add_path(self, udgs):
            if len(udgs) == 0:
                return
    
            for udg in udgs:
                # Register edges
                for barcode_edge in udg.edges:
                    edge_idx = self.d2g.barcode_edge_idxs[barcode_edge]
                    self.covering_variables[edge_idx] += 1
    
            # Special case for previously empty path
            if len(self) == 0:
                # 4 because it's the ideal case (1 node of difference with same length on 1 shift.
                self.penalty = 4
                self.append(udgs[0])
                udgs = udgs[1:]
    
            # Add udg one by one
            for udg in udgs:
                # Compute distance
                dist = self.d2g[str(udg.idx)][str(self[-1].idx)]["distance"]
                # Update penalty regarding distance
                self.penalty += pow(dist, 2)
                # Add the node
                self.append(udg)
    
        def normalized_penalty(self):
            return self.penalty / len(self)
    
        def covering_score(self):
            covered_num = 0
            for val in self.covering_variables.values():
                if val > 0:
                    covered_num += 1
    
            return covered_num / len(self.covering_variables)
    
        """ Count the number of variable covered by path that are not self covered. 
        """
        def covering_difference(self, path):
            self_covered = [var for var, num in self.covering_variables.items() if num > 0]
            self_covered = frozenset(self_covered)
    
            num_covered = 0
            for var, val in path.covering_variables.items():
                if val > 0 and var not in self_covered:
                    num_covered += 1
    
            return num_covered
    
        def save_path(self, filename):
            d2p = nx.Graph()
            # Add the nodes
            for udg in self:
                d2p.add_node(udg.idx)
                d2p.nodes[udg.idx]["center"] = udg.center
                d2p.nodes[udg.idx]["udg"] = str(udg)
                d2p.nodes[udg.idx]["score"] = f"{udg.score}/{udg.get_optimal_score()}"
    
            # add the edges
            for idx in range(len(self)-1):
                udg1 = self[idx]
                udg2 = self[idx+1]
    
                d2p.add_edge(udg1.idx, udg2.idx)
    
            nx.write_gexf(d2p, filename)
    
        def save_path_in_graph(self, filename):
            d2c = self.d2g.clone()
            for idx, udg in enumerate(self):
                d2c.nodes[str(udg.idx)]["path"] = idx
            nx.write_gexf(d2c, filename)
    
    
    class Unitig(Path):
    
        def __init__(self, d2g):
            super(Unitig, self).__init__(d2g)
    
        def add_right(self, udg):
            self.add_path([udg])
    
    
    def d2_path_to_barcode_path(path):
        barcode_per_idx = [set(udg.to_list()) for udg in path]
        diff_barcode_per_idx = []
        rev_diff_barcode_per_idx = []
        for idx in range(len(barcode_per_idx)-1):
            diff_barcode_per_idx.append(barcode_per_idx[idx] - barcode_per_idx[idx+1])
            rev_diff_barcode_per_idx.append(barcode_per_idx[idx+1] - barcode_per_idx[idx])
        diff_barcode_per_idx.append(barcode_per_idx[-1] - diff_barcode_per_idx[-1])
        rev_diff_barcode_per_idx.insert(0, barcode_per_idx[0] - rev_diff_barcode_per_idx[0])
    
        for diff, rev_diff in zip(diff_barcode_per_idx, rev_diff_barcode_per_idx):
            print(diff, rev_diff)