Select Git revision
d2_path.py 3.38 KiB
import networkx as nx
""" Represent an udg path into a d2g graph
"""
class Path(list):
def __init__(self, d2g):
super(Path, self).__init__()
self.penalty = 0
self.d2g = d2g
self.covering_variables = {x: 0 for x in self.d2g.barcode_edge_idxs.values()}
def add_path(self, udgs):
if len(udgs) == 0:
return
for udg in udgs:
# Register edges
for barcode_edge in udg.edges:
edge_idx = self.d2g.barcode_edge_idxs[barcode_edge]
self.covering_variables[edge_idx] += 1
# Special case for previously empty path
if len(self) == 0:
# 4 because it's the ideal case (1 node of difference with same length on 1 shift.
self.penalty = 4
self.append(udgs[0])
udgs = udgs[1:]
# Add udg one by one
for udg in udgs:
# Compute distance
dist = self.d2g[str(udg.idx)][str(self[-1].idx)]["distance"]
# Update penalty regarding distance
self.penalty += pow(dist, 2)
# Add the node
self.append(udg)
def normalized_penalty(self):
return self.penalty / len(self)
def covering_score(self):
covered_num = 0
for val in self.covering_variables.values():
if val > 0:
covered_num += 1
return covered_num / len(self.covering_variables)
""" Count the number of variable covered by path that are not self covered.
"""
def covering_difference(self, path):
self_covered = [var for var, num in self.covering_variables.items() if num > 0]
self_covered = frozenset(self_covered)
num_covered = 0
for var, val in path.covering_variables.items():
if val > 0 and var not in self_covered:
num_covered += 1
return num_covered
def save_path(self, filename):
d2p = nx.Graph()
# Add the nodes
for udg in self:
d2p.add_node(udg.idx)
d2p.nodes[udg.idx]["center"] = udg.center
d2p.nodes[udg.idx]["udg"] = str(udg)
d2p.nodes[udg.idx]["score"] = f"{udg.score}/{udg.get_optimal_score()}"
# add the edges
for idx in range(len(self)-1):
udg1 = self[idx]
udg2 = self[idx+1]
d2p.add_edge(udg1.idx, udg2.idx)
nx.write_gexf(d2p, filename)
def save_path_in_graph(self, filename):
d2c = self.d2g.clone()
for idx, udg in enumerate(self):
d2c.nodes[str(udg.idx)]["path"] = idx
nx.write_gexf(d2c, filename)
class Unitig(Path):
def __init__(self, d2g):
super(Unitig, self).__init__(d2g)
def add_right(self, udg):
self.add_path([udg])
def d2_path_to_barcode_path(path):
barcode_per_idx = [set(udg.to_list()) for udg in path]
diff_barcode_per_idx = []
rev_diff_barcode_per_idx = []
for idx in range(len(barcode_per_idx)-1):
diff_barcode_per_idx.append(barcode_per_idx[idx] - barcode_per_idx[idx+1])
rev_diff_barcode_per_idx.append(barcode_per_idx[idx+1] - barcode_per_idx[idx])
diff_barcode_per_idx.append(barcode_per_idx[-1] - diff_barcode_per_idx[-1])
rev_diff_barcode_per_idx.insert(0, barcode_per_idx[0] - rev_diff_barcode_per_idx[0])
for diff, rev_diff in zip(diff_barcode_per_idx, rev_diff_barcode_per_idx):
print(diff, rev_diff)