Commit 5cc89481 authored by Yoann Dufresne's avatar Yoann Dufresne

refactor lcp algorithms

parent 2025c898
......@@ -144,7 +144,7 @@ class LcpGraph(nx.Graph):
def create_graph_from_node_neighborhoods(self, neighborhood_threshold=0.25):
nodes = {}
# Create the nodes of d2g from udgs
# Create the nodes of lcpg from udgs
for lcp in self.all_lcp:
nodes[lcp] = lcp.idx
self.add_node(nodes[lcp])
......
......@@ -3,14 +3,14 @@ from collections import Counter
import sys
""" Represent an udg path into a d2g graph
""" Represent an udg path into a lcpg graph
"""
class Path(list):
def __init__(self, d2g):
def __init__(self, lcpg):
super(Path, self).__init__()
self.d2g = d2g
self.covering_variables = {x: 0 for x in self.d2g.variables}
self.lcpg = lcpg
self.covering_variables = {x: 0 for x in self.lcpg.variables}
self.covering_value = 0
# a succession of Counter (multiset)
self.barcode_order = []
......@@ -19,7 +19,7 @@ class Path(list):
self.size_score = 0
def append(self, obj) -> None:
lcp = self.d2g.get_lcp(obj)
lcp = self.lcpg.get_lcp(obj)
# Update the covering variables
for edge_idx in lcp.edges:
......@@ -100,7 +100,7 @@ class Path(list):
self.barcode_order.pop(set_idx)
def copy(self):
copy = Path(self.d2g)
copy = Path(self.lcpg)
# Copy the list
for x in self:
......
......@@ -3,168 +3,41 @@ import networkx as nx
# from deconvolution.lcpgraph.d2_path import Unitig
""" Remove unnecessary transitions
"""
def transitive_reduction(d2g):
def transitive_reduction(lcpg):
""" Remove unnecessary transitions
"""
# Remove self edges
for edge in d2g.edges():
for edge in lcpg.edges():
if edge[0] == edge[1]:
d2g.remove_edge(*edge)
lcpg.remove_edge(*edge)
# Analyse the graph for removable edges
to_remove = set()
for edge in d2g.edges():
dg1_name, dg2_name = edge
# Extract dgs
dg1 = d2g.node_by_idx[int(dg1_name)]
dg2 = d2g.node_by_idx[int(dg2_name)]
for edge in lcpg.edges():
lcp1_name, lcp2_name = edge
# Extract common neighbors
nei1 = frozenset(d2g.neighbors(dg1_name))
nei2 = frozenset(d2g.neighbors(dg2_name))
nei1 = frozenset(lcpg.neighbors(lcp1_name))
nei2 = frozenset(lcpg.neighbors(lcp2_name))
common = nei1.intersection(nei2)
# Look for all the common neighbors, if edge must be remove or not
current_dist = d2g[dg1_name][dg2_name]["distance"]
current_dist = lcpg[lcp1_name][lcp2_name]["distance"]
for node in common:
com_dg = d2g.node_by_idx[int(node)]
extern_dist = d2g[dg1_name][node]["distance"] + d2g[node][dg2_name]["distance"]
com_dg = lcpg.node_by_idx[int(node)]
extern_dist = lcpg[lcp1_name][node]["distance"] + lcpg[node][lcp2_name]["distance"]
# If better path, remove the edge
if extern_dist <= current_dist:
to_remove.add((dg1_name, dg2_name))
to_remove.add((lcp1_name, lcp2_name))
# Remove the edges
for edge in to_remove:
dg1_name, dg2_name = edge
lcp1_name, lcp2_name = edge
# Remove from graph
d2g.remove_edge(dg1_name, dg2_name)
lcpg.remove_edge(lcp1_name, lcp2_name)
print(f"{len(to_remove)} edge removed")
print(f"{len(d2g.edges())} remaining")
""" For each node of the d2 graph, construct a node in the reduced graph.
Then, for each node, compute the closest neighbors in d2 (with equal scores) and add an edge
in the greedy graph.
@param d2 Input d2 graph (with distances already computed)
@return A greedy constructed graph.
"""
def greedy_reduct(d2):
gG = nx.Graph()
for node in d2.nodes:
gG.add_node(node)
for dgraph, node in d2.nodes.items():
if not dgraph.idx in d2.distances or len(d2.distances[dgraph.idx]) == 0:
continue
distances = d2.distances[dgraph.idx]
min_dist = min(distances.values())
for graph_idx, dist in distances.items():
if dist == min_dist:
gG.add_edge(node, d2.nodes[d2.node_by_idx[graph_idx]])
return gG
def filter_singeltons(graph):
""" Remove the isolated nodes from graph.
"""
nodelist = list(graph.nodes())
for node in nodelist:
if len(graph[node]) == 0:
graph.remove_node(node)
return graph
print(f"{len(lcpg.edges())} remaining")
# """ Compute the unambiguous paths in a d2g. The d2g must not contain singletons.
# The unitigs are sorted by increasing penalty first and decreasing size second.
# @param d2g a d2g graph
# @return a list of unitigs
# """
# def compute_unitigs(d2g):
# unitigs = []
# used_nodes = {int(node): False for node in d2g.nodes()}
#
# for node_id in d2g.nodes():
# node_idx = int(node_id)
# node = d2g.node_by_idx[node_idx]
#
# # If already tested
# if used_nodes[node.idx]:
# continue
# used_nodes[node.idx] = True
#
# if d2g.degree(str(node.idx)) > 2:
# # Not a unitig
# continue
#
# # Create new unitigs
# unitig = compute_unitig_from(d2g, node)
# unitigs.append(unitig)
# for node in unitig:
# used_nodes[node.idx] = True
#
# # Sort the unitigs
# unitigs.sort(key=lambda x: (x.normalized_penalty(), -len(x)))
#
# return unitigs
#
#
# """ Compute a unitig starting from the node regarding the graph.
# The unitig will be extended on both sides
# @param d2g The d2g to look for unitig
# @param node The start node of the unitig
# @return The constructed unitig
# """
# def compute_unitig_from(d2g, node):
# unitig = Unitig(d2g)
# unitig.add_path([node])
# if d2g.degree(str(node.idx)) == 2:
# left, right = d2g.neighbors(str(node.idx))
# else:
# left = next(d2g.neighbors(str(node.idx)))
# right = None
#
# # Extends first side
# prev_node = node
# current_node = d2g.node_by_idx[int(left)]
# unitig = extend_unitig(unitig, d2g, prev_node, current_node)
# unitig.revert()
#
# # Extends second side
# prev_node = node
# current_node = d2g.node_by_idx[int(right)] if right is not None else None
# unitig = extend_unitig(unitig, d2g, prev_node, current_node)
#
# return unitig
#
#
# """ Directional extension of the unitig. Uses the previous and current nodes to detect the extension direction.
# @param unitig The unitig to extend. Will be modified during the execution.
# @param d2g The d2g to follow for extension
# @param prev_node Node already added in the unitig
# @param current_node Node to add into the unitig and used to select the next node to add. If not set, stop the extension.
# @return Return the modified unitig.
# """
# def extend_unitig(unitig, d2g, prev_node, current_node):
# if current_node is None:
# return unitig
#
# # Add the node
# unitig.add_right(current_node)
#
# # Look for the next node
# next_candidates = list(d2g.neighbors(str(current_node.idx)))
# next_candidates.remove(str(prev_node.idx))
# # Select next node
# next_node = d2g.node_by_idx[int(next_candidates[0])] if len(next_candidates) == 1 else None
# # Continue extension
# return extend_unitig(unitig, d2g, current_node, next_node)
......@@ -34,7 +34,7 @@ def construct_path_from_unitigs(unitigs, d2g):
""" Look for the next unitig to join from right endpoint of the path
@param path Currently created path
@param unitigs List of unitigs of interest (be careful to fill this list with unitigs that add new covered variables
@param d2g The d2g to use as path support
@param lcpg The lcpg to use as path support
@return (path_to, utg) Respectively the path to the next unitig of interest (from the right of the current path) and
the corresponding unitig (linked to the path from left to right)
"""
......@@ -75,8 +75,8 @@ def _search_way_to_next_unitig(path, unitigs, d2g):
""" Search the min-penalty paths from a start udg to any of the target nodes.
@param start_udg The start node of the path
@param targets A list of udg of interest
@param d2g The d2g to follow
@param forbidden_udgs excluded nodes from the d2g
@param lcpg The lcpg to follow
@param forbidden_udgs excluded nodes from the lcpg
@return A list of equivalent paths
"""
def _search_endpoint(start_udg, targets, d2g, forbidden_udgs):
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment