Skip to content
Snippets Groups Projects
Select Git revision
  • 5cc89481fff1c4379f0dc45aa4a79ddf5e12e12c
  • master default protected
  • dev
  • score_test
4 results

path_optimization.py

Blame
  • path_optimization.py 3.24 KiB
    import random
    from collections import Counter
    
    from deconvolution.lcpgraph.lcp_path import Path
    
    
    class Optimizer:
    
        def __init__(self, d2g):
            self.d2g = d2g
    
        def bb_solution(self, start_node=None, verbose=False):
            nb_steps_since_last_score_up = 0
            total_nb_steps = 0
            first_pass = True
    
            used_nodes = {n: False for n in self.d2g.nodes()}
    
            # Init solution for
            if start_node == None:
                start_node = list(used_nodes.keys())
                start_node = start_node[random.randint(0, len(start_node)-1)]
    
            best_path = Path(self.d2g)
            stack = [[start_node]]
            current_path = Path(self.d2g)
            last_increase_node = None
    
            max_cov = len(current_path.covering_variables)
    
            while len(stack) > 0:
                total_nb_steps += 1
                # 1 - Get next node to extend
                current_node = stack[-1].pop()
    
                # 2 - Node exploitation
                # 2.1 - Add the node
                used_nodes[current_node] = True
                current_path.append(current_node)
                # 2.3 - If best, save
                if current_path.covering_value > best_path.covering_value:
                    nb_steps_since_last_score_up = 0
                    best_path = current_path.copy()
                    last_increase_node = current_node
                    if verbose:
                        print(f"New best: {len(best_path)} {best_path.covering_value} / {max_cov} ({best_path.barcode_score})")
    
                    if best_path.covering_value == max_cov:
                        print("Max coverage")
                        return best_path
                else:
                    nb_steps_since_last_score_up += 1
    
                # 3 - Neighbors preparation
                neighbors = [x for x in self.d2g[current_node] if not used_nodes[x]]
                opt = self
    
                def node_sorting_value(node):
                    u = opt.d2g.node_by_idx[int(node)]
                    return (0 if len(opt.d2g.get_covering_variables(node)) - current_path.covering_value == 0 else 1,
                            - u.get_link_divergence(),
                            - opt.d2g[str(u.idx)][str(current_node)]["distance"])
                neighbors.sort(key=node_sorting_value)
                stack.append(neighbors)
    
                # 4 - No more neighbors here - back up
                while len(stack[-1]) == 0:
                    stack.pop()
                    if len(stack) == 0:
                        break
                    previous = current_path.pop()
                    used_nodes[previous] = False
    
                # 5 - Reinit the search from a side and stop when done
                if len(stack) == 0 or (total_nb_steps > 10000 and nb_steps_since_last_score_up >= total_nb_steps / 2):
                    if first_pass:
                        used_nodes = {n: False for n in self.d2g.nodes()}
                        best_path = Path(self.d2g)
                        stack = [[last_increase_node]]
                        current_path = Path(self.d2g)
                        if verbose:
                            print("Start again !")
                            import time
                            time.sleep(1)
                        first_pass = False
                        total_nb_steps = 0
                        nb_steps_since_last_score_up = 0
                    else:
                        return best_path
    
            return best_path