Commit 367747d2 authored by Yoann Dufresne's avatar Yoann Dufresne
Browse files

compute unitigs

parent c0020617
......@@ -50,7 +50,7 @@ def transitive_reduction(d2g):
print(f"{len(nxg.edges())} remaining")
""" For each node of the d2 graph, construct a node in the reducted graph.
""" For each node of the d2 graph, construct a node in the reduced graph.
Then, for each node, compute the closest neighbors in d2 (with equal scores) and add an edge
in the greedy graph.
@param d2 Input d2 graph (with distances already computed)
......@@ -88,94 +88,81 @@ def filter_singeltons(graph):
return graph
def compute_unitigs(graph, d2g):
""" Compute the unambiguious paths
""" Compute the unambiguous paths in a graph. The graph must not contain singletons.s
@param graph a networkx graph
@return a list of unitigs
def compute_unitigs(graph):
unitigs = []
nodelist = list(graph.nodes())
already_used = {node:False for node in nodelist}
used_nodes = {node:False for node in graph.nodes()}
current_unitig = Unitig()
for node in nodelist:
# skip the node if already in use in some unitig
if already_used[node]:
for node in graph.nodes():
# If already tested
if used_nodes[node]:
used_nodes[node] = True
# Start the unitig
unitig = Unitig(d2g, [node])
already_used[node] = True
# Save the unitig
# If 1 or 2 neighbors, part of the unitig
# Change unitig if not
if len(graph[node]) > 2 or len(graph[node]) == 0:
if > 2:
# Not a unitig
# Randomly determine the right and a left neighbors
neighbors = list(graph[node])
adding_functions = [unitig.add_right]
if len(neighbors) == 2:
# --- Explore the sides of the unitig ---
for idx in range(len(neighbors)):
previous, current = node, neighbors[idx]
while len(graph[current]) == 2 and not already_used[current]:
already_used[current] = True
# Go to next neighbor
neighbors = list(graph[current])
new_node = neighbors[idx] if neighbors[idx] != previous else neighbors[(idx+1)%2]
previous = current
current = new_node
# Add the extremity of the unitig
if len(graph[current]) == 1:
already_used[current] = True
# Create new unitigs
unitig = compute_unitig_from(graph, node)
for node in unitig:
used_nodes[node] = True
return unitigs
def compute_path_from_unitigs(d2g, unitigs):
path = Path()
# Create an edge using count.
included_edges = {}
for e in d2g.graph.edges():
if e[0] < e[1]:
included_edges[(e[0], e[1])] = False
included_edges[(e[1], e[0])] = False
remaining_edges = len(included_edges)
# Sort the unitigs by decreasing size
unitigs.sort(key=lambda x:len(x.nodes), reverse=True)
# Select the first unitig that contain at least 1 uncovered edge
used_unitigs = []
nb_used = 0
for unitig in unitigs:
if remaining_edges == 0:
# Compute usabable edges
edge_found = False
for edge in unitig.get_original_edges():
if not included_edges[edge]:
edge_found = True
included_edges[edge] = True
remaining_edges -= 1
# Nothing to improve the solution
if not edge_found:
# Add it to the path
nb_used += 1
print(f"Unitig used: {nb_used}/{len(unitigs)}")
return path
""" Compute a unitig starting from the node regarding the graph.
The unitig will be extended on both sides
@param graph The graph to look for unitig
@param node The start node of the unitig
@return The constructed unitig
def compute_unitig_from(graph, node):
unitig = Unitig(nodes=[node])
if == 2:
left, right = graph.neighbors(node)
left = next(graph.neighbors(node))
right = None
# Extends first side
prev_node = node
current_node = left
unitig = extend_unitig(unitig, graph, prev_node, current_node)
# Extends second side
prev_node = node
current_node = right
unitig = extend_unitig(unitig, graph, prev_node, current_node)
return unitig
""" Directional extension of the unitig. Uses the previous and current nodes to detect the extension direction.
@param unitig The unitig to extend. Will be modified during the execution.
@param graph The graph to follow for extension
@param prev_node Node already added in the unitig
@param current_node Node to add into the unitig and used to select the next node to add. If not set, stop the extension.
@return Return the modified unitig.
def extend_unitig(unitig, graph, prev_node, current_node):
if current_node == None:
return unitig
# Add the node
# Look for the next node
next_candidates = list(graph.neighbors(current_node))
# Select next node
next_node = next_candidates[0] if len(next_candidates) == 1 else None
# Continue extension
return extend_unitig(unitig, graph, current_node, next_node)
import networkx as nx
class Path(object):
class Path(list):
def __init__(self):
def __init__(self, nodes=[]):
super(Path, self).__init__()
self.nodes = []
self.nodes = [x for x in nodes]
def add_nodes(self, nodes):
......@@ -16,6 +16,10 @@ class Path(object):
def revert(self):
self.nodes = [x for x in self.nodes[::-1]]
def get_score(self):
return 0
......@@ -24,31 +28,10 @@ class Path(object):
return f"[{','.join([str(x) for x in self.nodes])}]"
def save_d2(self, d2g, file):
if len(self.nodes) == 0:
graph_path = nx.Graph()
for idx, node in enumerate(self.nodes[1:]):
dg1 = d2g.nodes.inverse[node]
prev_node = self.nodes[idx]
dg2 = d2g.nodes.inverse[prev_node]
if dg1 in d2g.distances[dg2]:
graph_path.add_edge(node, prev_node)
nx.write_gexf(graph_path, file)
class Unitig(Path):
def __init__(self, d2g, node_list):
super(Unitig, self).__init__()
self.nodes = node_list
self.d2g = d2g
def __init__(self, nodes=[]):
super(Unitig, self).__init__(nodes)
def add_left(self, node):
......@@ -59,35 +42,8 @@ class Unitig(Path):
def get_original_edges(self):
edges = []
for node in self.nodes:
dg = self.d2g.nodes.inverse[node]
return frozenset(edges)
class UnitigGraph():
def __init__(self, d2g, unitigs):
self.d2g = dg2
self.unitigs = unitigs
def compute_unitig_graph(self):
ug = nx.Graph()
# Get unitig extremities and index them
borders = {}
for unitig in self.unitigs:
borders.append[unitig.nodes[0]] = unitig
if len(unitig.nodes) > 1:
borders.append[unitig.nodes[-1]] = unitig
border_nodes = frozenset(borders.keys())
# link borders together
for unitig in self.unitigs:
#!/usr/bin/env python3
import networkx as nx
import argparse
import sys
import d2_graph as d2
from d2_algorithms import compute_unitigs
def parse_arguments():
parser = argparse.ArgumentParser(description='Greedy construction of a path through the d2 graph.')
parser.add_argument('barcode_graph', help='The barcode graph file. Must be a gefx formated file.')
parser.add_argument('d2_graph', help='d2 graph to reduce. Must be a gefx formated file.')
parser.add_argument('--output_path', '-o', default="d2_path.gexf", help="Output file prefix.")
args = parser.parse_args()
return args
def main():
# Parsing the arguments and validate them
args = parse_arguments()
barcode_file = args.barcode_graph
d2_file = args.d2_graph
if (not barcode_file.endswith('.gexf')) or (not d2_file.endswith(".gexf")):
print("Inputs file must be gexf formatted", file=sys.stderr)
# Loading
G = nx.read_gexf(barcode_file)
d2g = d2.D2Graph(G)
# Take the principal component
largest_component_nodes = max(nx.connected_components(d2g.nx_graph), key=len)
largest_component = d2g.nx_graph.subgraph(largest_component_nodes)
unitigs = compute_unitigs(largest_component)
# Write the simplificated graph
# nx.write_gexf(d2g.nx_graph, args.output_d2_name)
if __name__ == "__main__":
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment