diff --git a/deconvolution/d2graph/d2_graph.py b/deconvolution/d2graph/d2_graph.py index ccf94dd81a04c4c0500a96ee8d3b1075ec9a1dac..45d92b6f9f5b76122506ad2f3d22b7759fca43c7 100644 --- a/deconvolution/d2graph/d2_graph.py +++ b/deconvolution/d2graph/d2_graph.py @@ -91,7 +91,7 @@ class D2Graph(nx.Graph): self.bidict_nodes = self.create_graph_from_node_neighborhoods(neighbor_threshold) - def _lcp_from_obj(self, obj): + def get_lcp(self, obj): if type(obj) == str: obj = int(obj) if type(obj) == int: @@ -99,7 +99,7 @@ class D2Graph(nx.Graph): return obj def get_covering_variables(self, obj): - lcp = self._lcp_from_obj(obj) + lcp = self.get_lcp(obj) if lcp not in self.variables_per_lcp: variables = [] for e in lcp.edges: diff --git a/deconvolution/d2graph/d2_path.py b/deconvolution/d2graph/d2_path.py index ca9742d8cc4f76c0327ead1bbfb1168aa9d1fcdc..28d1686ad188a8518550b9e55e3a12d91c0a58a4 100644 --- a/deconvolution/d2graph/d2_path.py +++ b/deconvolution/d2graph/d2_path.py @@ -12,13 +12,7 @@ class Path(list): self.covering_value = 0 def append(self, obj) -> None: - # Transform the input into a lcp - if type(obj) == str: - obj = self.d2g.node_by_idx(int(obj)) - elif type(obj) == int: - obj = self.d2g.node_by_idx(obj) - - lcp = obj + lcp = self.d2g.get_lcp(obj) # Update the covering variables for barcode_edge in lcp.edges: @@ -29,28 +23,40 @@ class Path(list): super(Path, self).append(lcp) + def pop(self, index=-1): + lcp = super(Path, self).pop(index) + + # Update the covering variables + for barcode_edge in lcp.edges: + edge_idx = self.d2g.barcode_edge_idxs[barcode_edge] + self.covering_variables[edge_idx] -= 1 + if self.covering_variables[edge_idx] == 0: + self.covering_value -= 1 + + return lcp + + def copy(self): + copy = Path(self.d2g) + + # Copy the list + for x in self: + super(Path, copy).append(x) + + # Copy the variables + for key, val in self.covering_variables.items(): + copy.covering_variables[key] = val + copy.covering_value = self.covering_value + + return copy + def add_path(self, path): for lcp in path: self.append(lcp) - def covering_score(self): return self.covering_value / len(self.covering_variables) - """ Count the number of variable covered by path that are not self covered. - """ - def covering_difference(self, path): - self_covered = [var for var, num in self.covering_variables.items() if num > 0] - self_covered = frozenset(self_covered) - - num_covered = 0 - for var, val in path.covering_variables.items(): - if val > 0 and var not in self_covered: - num_covered += 1 - - return num_covered - - def save_path(self, filename): + def save_gexf(self, filename): d2p = nx.Graph() # Add the nodes for udg in self: @@ -70,32 +76,3 @@ class Path(list): nx.write_gexf(d2p, filename) - def save_path_in_graph(self, filename): - d2c = self.d2g.clone() - for idx, udg in enumerate(self): - d2c.nodes[str(udg.idx)]["path"] = idx - nx.write_gexf(d2c, filename) - - -class Unitig(Path): - - def __init__(self, d2g): - super(Unitig, self).__init__(d2g) - - def add_right(self, udg): - self.add_path([udg]) - - -def d2_path_to_barcode_path(path): - barcode_per_idx = [set(udg.to_list()) for udg in path] - diff_barcode_per_idx = [] - rev_diff_barcode_per_idx = [] - for idx in range(len(barcode_per_idx)-1): - diff_barcode_per_idx.append(barcode_per_idx[idx] - barcode_per_idx[idx+1]) - rev_diff_barcode_per_idx.append(barcode_per_idx[idx+1] - barcode_per_idx[idx]) - diff_barcode_per_idx.append(barcode_per_idx[-1] - diff_barcode_per_idx[-1]) - rev_diff_barcode_per_idx.insert(0, barcode_per_idx[0] - rev_diff_barcode_per_idx[0]) - - for diff, rev_diff in zip(diff_barcode_per_idx, rev_diff_barcode_per_idx): - print(diff, rev_diff) - diff --git a/deconvolution/d2graph/path_optimization.py b/deconvolution/d2graph/path_optimization.py index cdd73a5ea8578d4f2d2f68e5b21f3c45b77d5564..4ec3fab66ea13cfc30efc26f3dc10197f64dd215 100644 --- a/deconvolution/d2graph/path_optimization.py +++ b/deconvolution/d2graph/path_optimization.py @@ -1,5 +1,4 @@ import random -import networkx as nx from collections import Counter from deconvolution.d2graph.d2_path import Path @@ -9,38 +8,26 @@ class Optimizer: def __init__(self, d2g): self.d2g = d2g - self.solutions = [] - def init_random_solutions(self, nb_solutions): - for _ in range(nb_solutions): - rnd_sol = Solution(self.d2g) - rnd_sol.random_init_best_quality() - self.solutions.append(rnd_sol) - - def bb_solution(self, verbose=False): + def bb_solution(self, start_node=None, verbose=False): nb_steps_since_last_score_up = 0 total_nb_steps = 0 first_pass = True - max_cov = set() - for node in self.d2g.nodes(): - v = self.d2g.get_covering_variables(node) - max_cov.update(v) - max_cov = len(max_cov) - - used_nodes = {n:False for n in self.d2g.nodes()} - coverage = Counter() + used_nodes = {n: False for n in self.d2g.nodes()} # Init solution for - init = list(used_nodes.keys()) - init = init[random.randint(0, len(init)-1)] + if start_node == None: + start_node = list(used_nodes.keys()) + start_node = start_node[random.randint(0, len(start_node)-1)] - best_path = [] - best_score = 0 - stack = [[init]] - current_path = [] + best_path = Path(self.d2g) + stack = [[start_node]] + current_path = Path(self.d2g) last_increase_node = None + max_cov = len(current_path.covering_variables) + while len(stack) > 0: total_nb_steps += 1 # 1 - Get next node to extend @@ -50,19 +37,15 @@ class Optimizer: # 2.1 - Add the node used_nodes[current_node] = True current_path.append(current_node) - # 2.2 - Compute the coverage score - vars = self.d2g.get_covering_variables(current_node) - coverage += Counter(vars) # 2.3 - If best, save - if len(coverage) > best_score: + if current_path.covering_value > best_path.covering_value: nb_steps_since_last_score_up = 0 best_path = current_path.copy() - best_score = len(coverage) last_increase_node = current_node if verbose: - print(f"New best: {len(best_path)} {best_score} / {max_cov}") + print(f"New best: {len(best_path)} {best_path.covering_value} / {max_cov}") - if len(coverage) == max_cov: + if best_path.covering_value == max_cov: print("Max coverage") return best_path else: @@ -74,33 +57,31 @@ class Optimizer: def node_sorting_value(node): u = opt.d2g.node_by_idx[int(node)] - return (0 if len(Counter(opt.d2g.get_covering_variables(node)) - coverage) == 0 else 1, + return (0 if len(opt.d2g.get_covering_variables(node)) - current_path.covering_value == 0 else 1, - u.get_link_divergence(), - - self.d2g[str(u.idx)][str(current_node)]["distance"]) + - opt.d2g[str(u.idx)][str(current_node)]["distance"]) neighbors.sort(key=node_sorting_value) stack.append(neighbors) # 4 - No more neighbors here - back up while len(stack[-1]) == 0: stack.pop() + if len(stack) == 0: + break previous = current_path.pop() used_nodes[previous] = False - prev_vars = self.d2g.get_covering_variables(previous) - coverage -= Counter(prev_vars) # 5 - Reinit the search from a side and stop when done - if total_nb_steps > 10000 and nb_steps_since_last_score_up >= total_nb_steps / 2: + if len(stack) == 0 or (total_nb_steps > 10000 and nb_steps_since_last_score_up >= total_nb_steps / 2): if first_pass: used_nodes = {n: False for n in self.d2g.nodes()} - coverage = Counter() - best_path = [] - best_score = 0 + best_path = Path(self.d2g) stack = [[last_increase_node]] - current_path = [] + current_path = Path(self.d2g) if verbose: print("Start again !") import time - time.sleep(3) + time.sleep(1) first_pass = False total_nb_steps = 0 nb_steps_since_last_score_up = 0 @@ -108,71 +89,3 @@ class Optimizer: return best_path return best_path - - def extends_until_end(self, solution): - while self.extends(solution): - continue - solution.reverse() - while self.extends(solution): - continue - - def extends(self, solution): - # Get all the neighbors - cur_id = str(solution[-1].idx) - neighbors = [self.d2g.node_by_idx[int(x)] for x in self.d2g.neighbors(cur_id) if - self.d2g.node_by_idx[int(x)] not in solution] - - current_vars = frozenset([x for x, y in solution.covering_variables.items() if y > 0]) - if len(neighbors) == 0: - return False - - # Choose using the multiple optimization directions - next_udg = min(neighbors, - key=lambda x: (1 if len(self.d2g.get_covering_variables(x) - current_vars) == 0 else 0, - x.get_link_divergence(), - self.d2g[str(x.idx)][cur_id]["distance"])) - solution.add_path([next_udg]) - return True - - -class Solution(Path): - - def __init__(self, d2g): - super(Solution, self).__init__(d2g) - - def random_init_best_quality(self): - nodes = [self.d2g.node_by_idx[int(x)] for x in list(self.d2g.nodes())] - min_div = (min(nodes, key=lambda x: x.get_link_divergence())).get_link_divergence() - nodes = [x for x in nodes if x.get_link_divergence() == min_div] - - random_udg = random.choice(nodes) - - self.clear() - self.add_path([random_udg]) - - - """ Only respect counts for now - """ - def to_barcode_path(self): - barcode_per_position = [set(udg.to_sorted_list()) for udg in self] - compressed_barcodes = [] - - for idx, barcodes in enumerate(barcode_per_position): - for barcode in barcodes: - offset = 1 - while idx + offset < len(barcode_per_position) and barcode in barcode_per_position[idx + offset]: - barcode_per_position[idx + offset].remove(barcode) - offset += 1 - compressed_barcodes.append(barcode) - - return compressed_barcodes - - def save_barcode_path(self, filename): - barcodes = self.to_barcode_path() - - G = nx.Graph() - for idx, barcode in enumerate(barcodes): - G.add_node(idx) - G.nodes[idx]["center"] = barcode - - nx.write_gexf(G, filename) diff --git a/deconvolution/main/d2_to_path.py b/deconvolution/main/d2_to_path.py index 3d42a19cca21c0659149858eece01c0ab7c0b3a0..62824f7cdf617b9ab0bf174739f8dfcdf5ec3e46 100755 --- a/deconvolution/main/d2_to_path.py +++ b/deconvolution/main/d2_to_path.py @@ -43,24 +43,12 @@ def main(): # Start optimization optimizer = po.Optimizer(largest_component) - nodes = optimizer.bb_solution(verbose=args.verbose) - solution = po.Solution(largest_component) - solution.add_path([largest_component.node_by_idx[int(n)] for n in nodes]) - # optimizer.init_random_solutions(1) + path = optimizer.bb_solution(verbose=args.verbose) - # solution = optimizer.solutions[0] - # print("Solution creation...") - # optimizer.extends_until_end(solution) - print(f"covering score: {solution.covering_score()}") - # - # solution.save_path_in_graph(f"{args.out_prefix}_d2_path.gexf") - solution.save_path(f"{args.out_prefix}_path.gexf") - # solution.save_barcode_path(f"{args.out_prefix}_barcode_count.gexf") + print(f"covering score: {path.covering_score()}") + path.save_gexf(f"{args.out_prefix}_path.gexf") print("Solution saved") - #Â from d2_path import d2_path_to_barcode_path - # d2_path_to_barcode_path(solution) - if __name__ == "__main__": main()