Select Git revision
path_optimization.py
path_optimization.py 5.71 KiB
import random
import networkx as nx
from collections import Counter
from deconvolution.d2graph.d2_path import Path
class Optimizer:
def __init__(self, d2g):
self.d2g = d2g
self.solutions = []
def init_random_solutions(self, nb_solutions):
for _ in range(nb_solutions):
rnd_sol = Solution(self.d2g)
rnd_sol.random_init_best_quality()
self.solutions.append(rnd_sol)
def bb_solution(self, verbose=False):
nb_steps_since_last_score_up = 0
total_nb_steps = 0
first_pass = True
used_nodes = {n:False for n in self.d2g.nodes()}
coverage = Counter()
# Init solution for
init = list(used_nodes.keys())
init = init[random.randint(0, len(init)-1)]
best_path = []
best_score = 0
stack = [[init]]
current_path = []
last_increase_node = None
while len(stack) > 0:
total_nb_steps += 1
# 1 - Get next node to extend
current_node = stack[-1].pop()
# 2 - Node exploitation
# 2.1 - Add the node
used_nodes[current_node] = True
current_path.append(current_node)
# 2.2 - Compute the coverage score
vars = self.d2g.get_covering_variable_node(int(current_node))
coverage += Counter(vars)
# 2.3 - If best, save
if len(coverage) > best_score:
nb_steps_since_last_score_up = 0
best_path = current_path.copy()
best_score = len(coverage)
last_increase_node = current_node
if verbose:
print(f"New best: {len(best_path)} {best_score}")
else:
nb_steps_since_last_score_up += 1
# 3 - Neighbors preparation
neighbors = [x for x in self.d2g[current_node] if not used_nodes[x]]
opt = self
def node_sorting_value(node):
u = opt.d2g.node_by_idx[int(node)]
return (0 if len(Counter(opt.d2g.get_covering_variable_node(int(node))) - coverage) == 0 else 1,
- u.get_link_divergence())
neighbors.sort(key=node_sorting_value)
stack.append(neighbors)
# 4 - No more neighbors here - back up
while len(stack[-1]) == 0:
stack.pop()
previous = current_path.pop()
used_nodes[previous] = False
prev_vars = self.d2g.get_covering_variable_node(int(previous))
coverage -= Counter(prev_vars)
# 5 - Reinit the search from a side and stop when done
if total_nb_steps > 10000 and nb_steps_since_last_score_up >= total_nb_steps / 2:
if first_pass:
used_nodes = {n: False for n in self.d2g.nodes()}
coverage = Counter()
best_path = []
best_score = 0
stack = [[last_increase_node]]
current_path = []
if verbose:
print("Start again !")
first_pass = False
total_nb_steps = 0
nb_steps_since_last_score_up = 0
import time
time.sleep(3)
else:
return best_path
return best_path
def extends_until_end(self, solution):
while self.extends(solution):
continue
solution.reverse()
while self.extends(solution):
continue
def extends(self, solution):
# Get all the neighbors
cur_id = str(solution[-1].idx)
neighbors = [self.d2g.node_by_idx[int(x)] for x in self.d2g.neighbors(cur_id) if
self.d2g.node_by_idx[int(x)] not in solution]
current_vars = frozenset([x for x, y in solution.covering_variables.items() if y > 0])
if len(neighbors) == 0:
return False
# Choose using the multiple optimization directions
next_udg = min(neighbors,
key=lambda x: (1 if len(self.d2g.get_covering_variables(x) - current_vars) == 0 else 0,
x.get_link_divergence(),
self.d2g[str(x.idx)][cur_id]["distance"]))
solution.add_path([next_udg])
return True
class Solution(Path):
def __init__(self, d2g):
super(Solution, self).__init__(d2g)
def random_init_best_quality(self):
nodes = [self.d2g.node_by_idx[int(x)] for x in list(self.d2g.nodes())]
min_div = (min(nodes, key=lambda x: x.get_link_divergence())).get_link_divergence()
nodes = [x for x in nodes if x.get_link_divergence() == min_div]
random_udg = random.choice(nodes)
self.clear()
self.add_path([random_udg])
""" Only respect counts for now
"""
def to_barcode_path(self):
barcode_per_position = [set(udg.to_sorted_list()) for udg in self]
compressed_barcodes = []
for idx, barcodes in enumerate(barcode_per_position):
for barcode in barcodes:
offset = 1
while idx + offset < len(barcode_per_position) and barcode in barcode_per_position[idx + offset]:
barcode_per_position[idx + offset].remove(barcode)
offset += 1
compressed_barcodes.append(barcode)
return compressed_barcodes
def save_barcode_path(self, filename):
barcodes = self.to_barcode_path()
G = nx.Graph()
for idx, barcode in enumerate(barcodes):
G.add_node(idx)
G.nodes[idx]["center"] = barcode
nx.write_gexf(G, filename)