Skip to content
Snippets Groups Projects
Select Git revision
  • d8a1e50623f9a7b0f43178c473a848e6ea032395
  • master default protected
  • dev
  • score_test
4 results

path_optimization.py

Blame
  • path_optimization.py 5.33 KiB
    import random
    from collections import Counter
    
    from deconvolution.d2graph.d2_path import Path
    
    
    class Optimizer:
    
        def __init__(self, d2g):
            self.d2g = d2g
            self.solution = Path(d2g)
            self.used_nodes = {n: False for n in self.d2g.nodes()}
            self.stack = []
    
        def bb_solution(self, start_node=None, verbose=False):
            nb_steps_since_last_score_up = 0
            total_nb_steps = 0
            first_pass = True
    
            # Init solution for
            if start_node is None:
                start_node = list(self.used_nodes.keys())
                start_node = start_node[random.randint(0, len(start_node)-1)]
    
            best_path = Path(self.d2g)
            self.stack.append([start_node])
            self.solution = Path(self.d2g)
            last_increase_node = None
    
            max_cov = len(self.solution.covering_variables)
    
            while len(self.stack) > 0:
                total_nb_steps += 1
                # 1 - Get next node to extend
                current_node = self.stack[-1].pop()
    
                # 2 - Node exploitation
                # 2.1 - Add the node
                self.used_nodes[current_node] = True
                self.solution.append(current_node)
                # 2.3 - If best, save
                if self.solution.covering_value > best_path.covering_value:
                    nb_steps_since_last_score_up = 0
                    best_path = self.solution.copy()
                    last_increase_node = current_node
                    if verbose:
                        print(f"New best: {len(best_path)} {best_path.covering_value} / {max_cov} ({best_path.barcode_score})")
    
                    if best_path.covering_value == max_cov:
                        print("Max coverage")
                        self.solution = best_path
                        return best_path
                else:
                    nb_steps_since_last_score_up += 1
    
                # 3 - Neighbors preparation
                neighbors = []
                neighbors_scores = {}
                for nei in self.d2g[current_node]:
                    if not self.used_nodes[nei]:
                        neighbors.append(nei)
                        neighbors_scores[nei] = self.explore_scores(nei, max_depth=1)
    
                # opt = self
                # def node_sorting_value(node):
                #     u = opt.d2g.node_by_idx[int(node)]
                #     return (0 if len(opt.d2g.get_covering_variables(node)) - opt.solution.covering_value == 0 else 1,
                #             - u.get_link_divergence(),
                #             - opt.d2g[str(u.idx)][str(current_node)]["distance"])
                # def node_sorting_value(node):
                #     score = current_path.predict_score(node)
                #     # align.insert(0, len(align))
                #     return - score
                # neighbors.sort(key=node_sorting_value)
                neighbors.sort(key=lambda x: neighbors_scores[x])
                self.stack.append(neighbors)
    
                # 4 - No more neighbors here - back up
                while len(self.stack[-1]) == 0:
                    self.stack.pop()
                    if len(self.stack) == 0:
                        break
                    previous = self.solution.pop()
                    self.used_nodes[previous] = False
    
                # 5 - Reinit the search from a side and stop when done
                if len(self.stack) == 0 or (total_nb_steps > 10000 and nb_steps_since_last_score_up >= total_nb_steps / 10):
                    if first_pass:
                        self.used_nodes = {n: False for n in self.d2g.nodes()}
                        best_path = Path(self.d2g)
                        self.stack = [[last_increase_node]]
                        self.solution = Path(self.d2g)
                        if verbose:
                            print("Start again !")
                            import time
                            time.sleep(1)
                        first_pass = False
                        total_nb_steps = 0
                        nb_steps_since_last_score_up = 0
                    else:
                        self.solution = best_path
                        return best_path
    
            self.solution = best_path
            return best_path
    
        def explore_scores(self, node, max_depth=0):
            pre_score = self.solution.barcode_score
            self.solution.append(node)
            self.used_nodes[node] = True
    
            po = self
            def neighbor_generator(n):
                for nei in po.d2g[n]:
                    if not po.used_nodes[nei]:
                        yield nei
    
            max_score = self.solution.barcode_score
    
            local_stack = [neighbor_generator(node)]
            current_depth = 0
            while len(local_stack) > 0:
                if current_depth < max_depth:
                    gen = local_stack[-1]
                    try:
                        next_node = next(gen)
                        self.solution.append(next_node)
                        self.used_nodes[next_node] = True
                        if self.solution.barcode_score > max_score:
                            max_score = self.solution.barcode_score
                        local_stack.append(neighbor_generator(next_node))
                        current_depth += 1
                    except StopIteration:
                        local_stack.pop()
                        nei = self.solution.pop()
                        self.used_nodes[str(nei.idx)] = False
                        current_depth -= 1
                else:
                    local_stack.pop()
                    nei = self.solution.pop()
                    self.used_nodes[str(nei.idx)] = False
                    current_depth -= 1
    
    
            return max_score - pre_score