Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Yoann DUFRESNE
linked reads molecule ordering
Commits
678f0477
Commit
678f0477
authored
May 16, 2020
by
Yoann Dufresne
Browse files
path search refactoring
parent
05916aa3
Changes
4
Hide whitespace changes
Inline
Side-by-side
deconvolution/d2graph/d2_graph.py
View file @
678f0477
...
...
@@ -91,7 +91,7 @@ class D2Graph(nx.Graph):
self
.
bidict_nodes
=
self
.
create_graph_from_node_neighborhoods
(
neighbor_threshold
)
def
_lcp
_from_obj
(
self
,
obj
):
def
get
_lcp
(
self
,
obj
):
if
type
(
obj
)
==
str
:
obj
=
int
(
obj
)
if
type
(
obj
)
==
int
:
...
...
@@ -99,7 +99,7 @@ class D2Graph(nx.Graph):
return
obj
def
get_covering_variables
(
self
,
obj
):
lcp
=
self
.
_lcp
_from_obj
(
obj
)
lcp
=
self
.
get
_lcp
(
obj
)
if
lcp
not
in
self
.
variables_per_lcp
:
variables
=
[]
for
e
in
lcp
.
edges
:
...
...
deconvolution/d2graph/d2_path.py
View file @
678f0477
...
...
@@ -12,13 +12,7 @@ class Path(list):
self
.
covering_value
=
0
def
append
(
self
,
obj
)
->
None
:
# Transform the input into a lcp
if
type
(
obj
)
==
str
:
obj
=
self
.
d2g
.
node_by_idx
(
int
(
obj
))
elif
type
(
obj
)
==
int
:
obj
=
self
.
d2g
.
node_by_idx
(
obj
)
lcp
=
obj
lcp
=
self
.
d2g
.
get_lcp
(
obj
)
# Update the covering variables
for
barcode_edge
in
lcp
.
edges
:
...
...
@@ -29,28 +23,40 @@ class Path(list):
super
(
Path
,
self
).
append
(
lcp
)
def
pop
(
self
,
index
=-
1
):
lcp
=
super
(
Path
,
self
).
pop
(
index
)
# Update the covering variables
for
barcode_edge
in
lcp
.
edges
:
edge_idx
=
self
.
d2g
.
barcode_edge_idxs
[
barcode_edge
]
self
.
covering_variables
[
edge_idx
]
-=
1
if
self
.
covering_variables
[
edge_idx
]
==
0
:
self
.
covering_value
-=
1
return
lcp
def
copy
(
self
):
copy
=
Path
(
self
.
d2g
)
# Copy the list
for
x
in
self
:
super
(
Path
,
copy
).
append
(
x
)
# Copy the variables
for
key
,
val
in
self
.
covering_variables
.
items
():
copy
.
covering_variables
[
key
]
=
val
copy
.
covering_value
=
self
.
covering_value
return
copy
def
add_path
(
self
,
path
):
for
lcp
in
path
:
self
.
append
(
lcp
)
def
covering_score
(
self
):
return
self
.
covering_value
/
len
(
self
.
covering_variables
)
""" Count the number of variable covered by path that are not self covered.
"""
def
covering_difference
(
self
,
path
):
self_covered
=
[
var
for
var
,
num
in
self
.
covering_variables
.
items
()
if
num
>
0
]
self_covered
=
frozenset
(
self_covered
)
num_covered
=
0
for
var
,
val
in
path
.
covering_variables
.
items
():
if
val
>
0
and
var
not
in
self_covered
:
num_covered
+=
1
return
num_covered
def
save_path
(
self
,
filename
):
def
save_gexf
(
self
,
filename
):
d2p
=
nx
.
Graph
()
# Add the nodes
for
udg
in
self
:
...
...
@@ -70,32 +76,3 @@ class Path(list):
nx
.
write_gexf
(
d2p
,
filename
)
def
save_path_in_graph
(
self
,
filename
):
d2c
=
self
.
d2g
.
clone
()
for
idx
,
udg
in
enumerate
(
self
):
d2c
.
nodes
[
str
(
udg
.
idx
)][
"path"
]
=
idx
nx
.
write_gexf
(
d2c
,
filename
)
class
Unitig
(
Path
):
def
__init__
(
self
,
d2g
):
super
(
Unitig
,
self
).
__init__
(
d2g
)
def
add_right
(
self
,
udg
):
self
.
add_path
([
udg
])
def
d2_path_to_barcode_path
(
path
):
barcode_per_idx
=
[
set
(
udg
.
to_list
())
for
udg
in
path
]
diff_barcode_per_idx
=
[]
rev_diff_barcode_per_idx
=
[]
for
idx
in
range
(
len
(
barcode_per_idx
)
-
1
):
diff_barcode_per_idx
.
append
(
barcode_per_idx
[
idx
]
-
barcode_per_idx
[
idx
+
1
])
rev_diff_barcode_per_idx
.
append
(
barcode_per_idx
[
idx
+
1
]
-
barcode_per_idx
[
idx
])
diff_barcode_per_idx
.
append
(
barcode_per_idx
[
-
1
]
-
diff_barcode_per_idx
[
-
1
])
rev_diff_barcode_per_idx
.
insert
(
0
,
barcode_per_idx
[
0
]
-
rev_diff_barcode_per_idx
[
0
])
for
diff
,
rev_diff
in
zip
(
diff_barcode_per_idx
,
rev_diff_barcode_per_idx
):
print
(
diff
,
rev_diff
)
deconvolution/d2graph/path_optimization.py
View file @
678f0477
import
random
import
networkx
as
nx
from
collections
import
Counter
from
deconvolution.d2graph.d2_path
import
Path
...
...
@@ -9,38 +8,26 @@ class Optimizer:
def
__init__
(
self
,
d2g
):
self
.
d2g
=
d2g
self
.
solutions
=
[]
def
init_random_solutions
(
self
,
nb_solutions
):
for
_
in
range
(
nb_solutions
):
rnd_sol
=
Solution
(
self
.
d2g
)
rnd_sol
.
random_init_best_quality
()
self
.
solutions
.
append
(
rnd_sol
)
def
bb_solution
(
self
,
verbose
=
False
):
def
bb_solution
(
self
,
start_node
=
None
,
verbose
=
False
):
nb_steps_since_last_score_up
=
0
total_nb_steps
=
0
first_pass
=
True
max_cov
=
set
()
for
node
in
self
.
d2g
.
nodes
():
v
=
self
.
d2g
.
get_covering_variables
(
node
)
max_cov
.
update
(
v
)
max_cov
=
len
(
max_cov
)
used_nodes
=
{
n
:
False
for
n
in
self
.
d2g
.
nodes
()}
coverage
=
Counter
()
used_nodes
=
{
n
:
False
for
n
in
self
.
d2g
.
nodes
()}
# Init solution for
init
=
list
(
used_nodes
.
keys
())
init
=
init
[
random
.
randint
(
0
,
len
(
init
)
-
1
)]
if
start_node
==
None
:
start_node
=
list
(
used_nodes
.
keys
())
start_node
=
start_node
[
random
.
randint
(
0
,
len
(
start_node
)
-
1
)]
best_path
=
[]
best_score
=
0
stack
=
[[
init
]]
current_path
=
[]
best_path
=
Path
(
self
.
d2g
)
stack
=
[[
start_node
]]
current_path
=
Path
(
self
.
d2g
)
last_increase_node
=
None
max_cov
=
len
(
current_path
.
covering_variables
)
while
len
(
stack
)
>
0
:
total_nb_steps
+=
1
# 1 - Get next node to extend
...
...
@@ -50,19 +37,15 @@ class Optimizer:
# 2.1 - Add the node
used_nodes
[
current_node
]
=
True
current_path
.
append
(
current_node
)
# 2.2 - Compute the coverage score
vars
=
self
.
d2g
.
get_covering_variables
(
current_node
)
coverage
+=
Counter
(
vars
)
# 2.3 - If best, save
if
len
(
coverage
)
>
best_scor
e
:
if
current_path
.
covering_value
>
best_path
.
covering_valu
e
:
nb_steps_since_last_score_up
=
0
best_path
=
current_path
.
copy
()
best_score
=
len
(
coverage
)
last_increase_node
=
current_node
if
verbose
:
print
(
f
"New best:
{
len
(
best_path
)
}
{
best_
scor
e
}
/
{
max_cov
}
"
)
print
(
f
"New best:
{
len
(
best_path
)
}
{
best_
path
.
covering_valu
e
}
/
{
max_cov
}
"
)
if
len
(
coverage
)
==
max_cov
:
if
best_path
.
covering_value
==
max_cov
:
print
(
"Max coverage"
)
return
best_path
else
:
...
...
@@ -74,33 +57,31 @@ class Optimizer:
def
node_sorting_value
(
node
):
u
=
opt
.
d2g
.
node_by_idx
[
int
(
node
)]
return
(
0
if
len
(
Counter
(
opt
.
d2g
.
get_covering_variables
(
node
))
-
c
overage
)
==
0
else
1
,
return
(
0
if
len
(
opt
.
d2g
.
get_covering_variables
(
node
))
-
c
urrent_path
.
covering_value
==
0
else
1
,
-
u
.
get_link_divergence
(),
-
self
.
d2g
[
str
(
u
.
idx
)][
str
(
current_node
)][
"distance"
])
-
opt
.
d2g
[
str
(
u
.
idx
)][
str
(
current_node
)][
"distance"
])
neighbors
.
sort
(
key
=
node_sorting_value
)
stack
.
append
(
neighbors
)
# 4 - No more neighbors here - back up
while
len
(
stack
[
-
1
])
==
0
:
stack
.
pop
()
if
len
(
stack
)
==
0
:
break
previous
=
current_path
.
pop
()
used_nodes
[
previous
]
=
False
prev_vars
=
self
.
d2g
.
get_covering_variables
(
previous
)
coverage
-=
Counter
(
prev_vars
)
# 5 - Reinit the search from a side and stop when done
if
total_nb_steps
>
10000
and
nb_steps_since_last_score_up
>=
total_nb_steps
/
2
:
if
len
(
stack
)
==
0
or
(
total_nb_steps
>
10000
and
nb_steps_since_last_score_up
>=
total_nb_steps
/
2
)
:
if
first_pass
:
used_nodes
=
{
n
:
False
for
n
in
self
.
d2g
.
nodes
()}
coverage
=
Counter
()
best_path
=
[]
best_score
=
0
best_path
=
Path
(
self
.
d2g
)
stack
=
[[
last_increase_node
]]
current_path
=
[]
current_path
=
Path
(
self
.
d2g
)
if
verbose
:
print
(
"Start again !"
)
import
time
time
.
sleep
(
3
)
time
.
sleep
(
1
)
first_pass
=
False
total_nb_steps
=
0
nb_steps_since_last_score_up
=
0
...
...
@@ -108,71 +89,3 @@ class Optimizer:
return
best_path
return
best_path
def
extends_until_end
(
self
,
solution
):
while
self
.
extends
(
solution
):
continue
solution
.
reverse
()
while
self
.
extends
(
solution
):
continue
def
extends
(
self
,
solution
):
# Get all the neighbors
cur_id
=
str
(
solution
[
-
1
].
idx
)
neighbors
=
[
self
.
d2g
.
node_by_idx
[
int
(
x
)]
for
x
in
self
.
d2g
.
neighbors
(
cur_id
)
if
self
.
d2g
.
node_by_idx
[
int
(
x
)]
not
in
solution
]
current_vars
=
frozenset
([
x
for
x
,
y
in
solution
.
covering_variables
.
items
()
if
y
>
0
])
if
len
(
neighbors
)
==
0
:
return
False
# Choose using the multiple optimization directions
next_udg
=
min
(
neighbors
,
key
=
lambda
x
:
(
1
if
len
(
self
.
d2g
.
get_covering_variables
(
x
)
-
current_vars
)
==
0
else
0
,
x
.
get_link_divergence
(),
self
.
d2g
[
str
(
x
.
idx
)][
cur_id
][
"distance"
]))
solution
.
add_path
([
next_udg
])
return
True
class
Solution
(
Path
):
def
__init__
(
self
,
d2g
):
super
(
Solution
,
self
).
__init__
(
d2g
)
def
random_init_best_quality
(
self
):
nodes
=
[
self
.
d2g
.
node_by_idx
[
int
(
x
)]
for
x
in
list
(
self
.
d2g
.
nodes
())]
min_div
=
(
min
(
nodes
,
key
=
lambda
x
:
x
.
get_link_divergence
())).
get_link_divergence
()
nodes
=
[
x
for
x
in
nodes
if
x
.
get_link_divergence
()
==
min_div
]
random_udg
=
random
.
choice
(
nodes
)
self
.
clear
()
self
.
add_path
([
random_udg
])
""" Only respect counts for now
"""
def
to_barcode_path
(
self
):
barcode_per_position
=
[
set
(
udg
.
to_sorted_list
())
for
udg
in
self
]
compressed_barcodes
=
[]
for
idx
,
barcodes
in
enumerate
(
barcode_per_position
):
for
barcode
in
barcodes
:
offset
=
1
while
idx
+
offset
<
len
(
barcode_per_position
)
and
barcode
in
barcode_per_position
[
idx
+
offset
]:
barcode_per_position
[
idx
+
offset
].
remove
(
barcode
)
offset
+=
1
compressed_barcodes
.
append
(
barcode
)
return
compressed_barcodes
def
save_barcode_path
(
self
,
filename
):
barcodes
=
self
.
to_barcode_path
()
G
=
nx
.
Graph
()
for
idx
,
barcode
in
enumerate
(
barcodes
):
G
.
add_node
(
idx
)
G
.
nodes
[
idx
][
"center"
]
=
barcode
nx
.
write_gexf
(
G
,
filename
)
deconvolution/main/d2_to_path.py
View file @
678f0477
...
...
@@ -43,24 +43,12 @@ def main():
# Start optimization
optimizer
=
po
.
Optimizer
(
largest_component
)
nodes
=
optimizer
.
bb_solution
(
verbose
=
args
.
verbose
)
solution
=
po
.
Solution
(
largest_component
)
solution
.
add_path
([
largest_component
.
node_by_idx
[
int
(
n
)]
for
n
in
nodes
])
# optimizer.init_random_solutions(1)
path
=
optimizer
.
bb_solution
(
verbose
=
args
.
verbose
)
# solution = optimizer.solutions[0]
# print("Solution creation...")
# optimizer.extends_until_end(solution)
print
(
f
"covering score:
{
solution
.
covering_score
()
}
"
)
#
# solution.save_path_in_graph(f"{args.out_prefix}_d2_path.gexf")
solution
.
save_path
(
f
"
{
args
.
out_prefix
}
_path.gexf"
)
# solution.save_barcode_path(f"{args.out_prefix}_barcode_count.gexf")
print
(
f
"covering score:
{
path
.
covering_score
()
}
"
)
path
.
save_gexf
(
f
"
{
args
.
out_prefix
}
_path.gexf"
)
print
(
"Solution saved"
)
# from d2_path import d2_path_to_barcode_path
# d2_path_to_barcode_path(solution)
if
__name__
==
"__main__"
:
main
()
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment