Skip to content
Snippets Groups Projects
Select Git revision
  • 5cc89481fff1c4379f0dc45aa4a79ddf5e12e12c
  • master default protected
  • dev
  • score_test
4 results

path_algorithms.py

Blame
  • user avatar
    Yoann Dufresne authored
    5cc89481
    History
    path_algorithms.py 5.50 KiB
    from deconvolution.lcpgraph.lcp_path import Path
    
    """ Greedy algorithm. Start with th most probable unitig (ie lowest normalized penalty first and largest unitig for
        equalities). Then extends on both side to the nearest interesting unitig.
        an interesting unitig is a unitig that add new covering variables.
    """
    def construct_path_from_unitigs(unitigs, d2g):
        # Initiate covering variables
        used_variables = {x: False for x in d2g.barcode_edge_idxs}
    
        # Initiate the path with longest sure unitig
        path = Path(d2g)
        first_utg = unitigs.pop(0)
        path.add_path(first_utg)
        print(len(path), path.covering_score())
    
        # While it's possible, search to add unitigs
        while len(unitigs) > 0:
            # Search for next unitig
            path_extension = _search_way_to_next_unitig(path, unitigs, d2g)
            if path_extension is None:
                break
            # TODO: Search for unitigs on the other side of the path
    
            # Add the unitig to the path
            path.add_path(path_extension)
            print(len(path), path.covering_score())
    
            unitigs = [utg for utg in unitigs if path.covering_difference(utg) > 0]
    
        return path
    
    
    """ Look for the next unitig to join from right endpoint of the path
        @param path Currently created path
        @param unitigs List of unitigs of interest (be careful to fill this list with unitigs that add new covered variables
        @param lcpg The lcpg to use as path support
        @return (path_to, utg) Respectively the path to the next unitig of interest (from the right of the current path) and
        the corresponding unitig (linked to the path from left to right)
    """
    def _search_way_to_next_unitig(path, unitigs, d2g):
        # Register endpoints
        endpoints = {}
        for utg in unitigs:
            endpoints[utg[0]] = utg
            if len(utg) > 1:
                endpoints[utg[-1]] = utg
    
        # Search for next endpoint
        best_paths = _search_endpoint(path[-1], endpoints, d2g, path)
        if len(best_paths) == 0:
            return None
    
        # Search for the best path (unitig included) to add (The one that cover the most new variables)
        the_best_path = None
        covered_variables = 0
        for extension in best_paths:
            utg = endpoints[extension[-1]]
            # if the utg is in the wrong size
            if utg[-1] == extension[-1]:
                utg.reverse()
    
            complete_path = Path(d2g)
            complete_path.add_path(extension[1:])
            complete_path.add_path(utg[1:])
    
            num_new_vars = path.covering_difference(complete_path)
            if num_new_vars > covered_variables:
                covered_variables = num_new_vars
                the_best_path = complete_path
    
        return the_best_path
    
    
    """ Search the min-penalty paths from a start udg to any of the target nodes.
        @param start_udg The start node of the path
        @param targets A list of udg of interest
        @param lcpg The lcpg to follow
        @param forbidden_udgs excluded nodes from the lcpg
        @return A list of equivalent paths
    """
    def _search_endpoint(start_udg, targets, d2g, forbidden_udgs):
        marked_nodes = {x: (None, None) for x in forbidden_udgs}
        covered_variables = set(forbidden_udgs.covering_variables.keys())
    
        # Init Dijkstra
        to_explore = [start_udg]
        marked_nodes[start_udg] = (0, None)
        found = []
    
        # Dijkstra part 1, compute penalties and previous nodes
        while len(to_explore) > 0 and len(found) == 0:
            # Select min penalty in to_explore
            current = min(to_explore, key=lambda x: marked_nodes[x][0])
            current_penalty = marked_nodes[current][0]
            to_explore.remove(current)
    
            # Filter neighbors by there covering values
            neighbors = d2g.neighbors(str(current.idx))
            filtered_neighbors = []
            for n in neighbors:
                nei = d2g.node_by_idx[int(n)]
                if len(d2g.get_covering_variables([nei]) - covered_variables) > 0:
                    filtered_neighbors.append(n)
            neighbors = filtered_neighbors
    
            # Explore all the neighbors of the current node.
            for nei_idx in neighbors:
                nei_udg = d2g.node_by_idx[int(nei_idx)]
                penalty = current_penalty + d2g[nei_idx][str(current.idx)]["distance"]
    
                if nei_udg in marked_nodes:
                    # Forbidden node
                    if marked_nodes[nei_udg][0] is None:
                        continue
                    # Already inserted but update the min penalty to reach the udg
                    if penalty < marked_nodes[nei_udg][0]:
                        marked_nodes[nei_udg] = penalty, current
                else:
                    # First encounter: insert to explore and set min penalty
                    marked_nodes[nei_udg] = (penalty, current)
                    to_explore.append(nei_udg)
                    # Set stop if a target is found
                    if nei_udg in targets:
                        if len(found) == 0:
                            found.append(nei_udg)
                        elif marked_nodes[found[0]][0] == penalty:
                            found.append(nei_udg)
                        elif marked_nodes[found[0]][0] > penalty:
                            found = [nei_udg]
    
        # Dijkstra part 2, retrieve the best paths
        paths = []
        for target in found:
            # Start a path
            node_path = [target]
    
            # Add all nodes one by one to the first
            while node_path[-1] != start_udg:
                node_path.append(marked_nodes[node_path[-1]][1])
    
            # Reverse the path to be on the right order
            node_path.reverse()
            # Create and add a real path object
            path = Path(d2g)
            path.add_path(node_path)
            paths.append(path)
    
        return paths