Skip to content
Snippets Groups Projects
Select Git revision
  • eeab1f7a49c42bf7119e67705313915679af6f58
  • master default protected
  • dev
  • score_test
4 results

path_optimization.py

Blame
  • user avatar
    Yoann Dufresne authored
    eeab1f7a
    History
    path_optimization.py 5.71 KiB
    import random
    import networkx as nx
    from collections import Counter
    
    from deconvolution.d2graph.d2_path import Path
    
    
    class Optimizer:
    
        def __init__(self, d2g):
            self.d2g = d2g
            self.solutions = []
    
        def init_random_solutions(self, nb_solutions):
            for _ in range(nb_solutions):
                rnd_sol = Solution(self.d2g)
                rnd_sol.random_init_best_quality()
                self.solutions.append(rnd_sol)
    
        def bb_solution(self, verbose=False):
            nb_steps_since_last_score_up = 0
            total_nb_steps = 0
            first_pass = True
    
            used_nodes = {n:False for n in self.d2g.nodes()}
            coverage = Counter()
    
            # Init solution for
            init = list(used_nodes.keys())
            init = init[random.randint(0, len(init)-1)]
    
            best_path = []
            best_score = 0
            stack = [[init]]
            current_path = []
            last_increase_node = None
    
            while len(stack) > 0:
                total_nb_steps += 1
                # 1 - Get next node to extend
                current_node = stack[-1].pop()
    
                # 2 - Node exploitation
                # 2.1 - Add the node
                used_nodes[current_node] = True
                current_path.append(current_node)
                # 2.2 - Compute the coverage score
                vars = self.d2g.get_covering_variable_node(int(current_node))
                coverage += Counter(vars)
                # 2.3 - If best, save
                if len(coverage) > best_score:
                    nb_steps_since_last_score_up = 0
                    best_path = current_path.copy()
                    best_score = len(coverage)
                    last_increase_node = current_node
                    if verbose:
                        print(f"New best: {len(best_path)} {best_score}")
                else:
                    nb_steps_since_last_score_up += 1
    
                # 3 - Neighbors preparation
                neighbors = [x for x in self.d2g[current_node] if not used_nodes[x]]
                opt = self
    
                def node_sorting_value(node):
                    u = opt.d2g.node_by_idx[int(node)]
                    return (0 if len(Counter(opt.d2g.get_covering_variable_node(int(node))) - coverage) == 0 else 1,
                            - u.get_link_divergence())
                neighbors.sort(key=node_sorting_value)
                stack.append(neighbors)
    
                # 4 - No more neighbors here - back up
                while len(stack[-1]) == 0:
                    stack.pop()
                    previous = current_path.pop()
                    used_nodes[previous] = False
                    prev_vars = self.d2g.get_covering_variable_node(int(previous))
                    coverage -= Counter(prev_vars)
    
                # 5 - Reinit the search from a side and stop when done
                if total_nb_steps > 10000 and nb_steps_since_last_score_up >= total_nb_steps / 2:
                    if first_pass:
                        used_nodes = {n: False for n in self.d2g.nodes()}
                        coverage = Counter()
                        best_path = []
                        best_score = 0
                        stack = [[last_increase_node]]
                        current_path = []
                        if verbose:
                            print("Start again !")
                        first_pass = False
                        total_nb_steps = 0
                        nb_steps_since_last_score_up = 0
                        import time
                        time.sleep(3)
                    else:
                        return best_path
    
            return best_path
    
        def extends_until_end(self, solution):
            while self.extends(solution):
                continue
            solution.reverse()
            while self.extends(solution):
                continue
    
        def extends(self, solution):
            # Get all the neighbors
            cur_id = str(solution[-1].idx)
            neighbors = [self.d2g.node_by_idx[int(x)] for x in self.d2g.neighbors(cur_id) if
                         self.d2g.node_by_idx[int(x)] not in solution]
    
            current_vars = frozenset([x for x, y in solution.covering_variables.items() if y > 0])
            if len(neighbors) == 0:
                return False
    
            # Choose using the multiple optimization directions
            next_udg = min(neighbors,
                           key=lambda x: (1 if len(self.d2g.get_covering_variables(x) - current_vars) == 0 else 0,
                                          x.get_link_divergence(),
                                          self.d2g[str(x.idx)][cur_id]["distance"]))
            solution.add_path([next_udg])
            return True
    
    
    class Solution(Path):
    
        def __init__(self, d2g):
            super(Solution, self).__init__(d2g)
    
        def random_init_best_quality(self):
            nodes = [self.d2g.node_by_idx[int(x)] for x in list(self.d2g.nodes())]
            min_div = (min(nodes, key=lambda x: x.get_link_divergence())).get_link_divergence()
            nodes = [x for x in nodes if x.get_link_divergence() == min_div]
    
            random_udg = random.choice(nodes)
    
            self.clear()
            self.add_path([random_udg])
    
    
        """ Only respect counts for now
        """
        def to_barcode_path(self):
            barcode_per_position = [set(udg.to_sorted_list()) for udg in self]
            compressed_barcodes = []
    
            for idx, barcodes in enumerate(barcode_per_position):
                for barcode in barcodes:
                    offset = 1
                    while idx + offset < len(barcode_per_position) and barcode in barcode_per_position[idx + offset]:
                        barcode_per_position[idx + offset].remove(barcode)
                        offset += 1
                    compressed_barcodes.append(barcode)
    
            return compressed_barcodes
    
        def save_barcode_path(self, filename):
            barcodes = self.to_barcode_path()
    
            G = nx.Graph()
            for idx, barcode in enumerate(barcodes):
                G.add_node(idx)
                G.nodes[idx]["center"] = barcode
    
            nx.write_gexf(G, filename)