Skip to content
GitLab
Menu
Projects
Groups
Snippets
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Yoann DUFRESNE
linked reads molecule ordering
Commits
5fd16e32
Commit
5fd16e32
authored
May 16, 2020
by
Yoann Dufresne
Browse files
refactor solution class
parent
c653160c
Changes
3
Hide whitespace changes
Inline
Side-by-side
deconvolution/d2graph/d2_graph.py
View file @
5fd16e32
...
...
@@ -20,7 +20,7 @@ class D2Graph(nx.Graph):
self
.
barcode_graph
=
barcode_graph
self
.
index
=
None
self
.
variables_per_
node
=
{}
self
.
variables_per_
lcp
=
{}
# Number the edges from original graph
self
.
barcode_edge_idxs
=
{}
...
...
@@ -91,19 +91,22 @@ class D2Graph(nx.Graph):
self
.
bidict_nodes
=
self
.
create_graph_from_node_neighborhoods
(
neighbor_threshold
)
def
get_covering_variable_node
(
self
,
node
):
if
node
not
in
self
.
variables_per_node
:
udg
=
self
.
node_by_idx
[
node
]
self
.
variables_per_node
[
node
]
=
[
x
for
x
in
self
.
get_covering_variables
(
udg
)]
def
_lcp_from_obj
(
self
,
obj
):
if
type
(
obj
)
==
str
:
obj
=
int
(
obj
)
if
type
(
obj
)
==
int
:
obj
=
self
.
node_by_idx
[
obj
]
return
obj
return
self
.
variables_per_node
[
node
]
def
get_covering_variables
(
self
,
obj
):
lcp
=
self
.
_lcp_from_obj
(
obj
)
if
lcp
not
in
self
.
variables_per_lcp
:
variables
=
[]
for
e
in
lcp
.
edges
:
variables
.
append
(
self
.
barcode_edge_idxs
[
e
])
self
.
variables_per_lcp
[
lcp
]
=
variables
def
get_covering_variables
(
self
,
udg
):
variables
=
[]
for
e
in
udg
.
edges
:
variables
.
append
(
self
.
barcode_edge_idxs
[
e
])
return
frozenset
(
variables
)
return
self
.
variables_per_lcp
[
lcp
]
def
save
(
self
,
filename
):
...
...
deconvolution/d2graph/d2_path.py
View file @
5fd16e32
...
...
@@ -7,46 +7,35 @@ class Path(list):
def
__init__
(
self
,
d2g
):
super
(
Path
,
self
).
__init__
()
self
.
penalty
=
0
self
.
d2g
=
d2g
self
.
covering_variables
=
{
x
:
0
for
x
in
self
.
d2g
.
barcode_edge_idxs
.
values
()}
self
.
covering_value
=
0
def
add_path
(
self
,
udgs
):
if
len
(
udgs
)
==
0
:
return
for
udg
in
udgs
:
# Register edges
for
barcode_edge
in
udg
.
edges
:
edge_idx
=
self
.
d2g
.
barcode_edge_idxs
[
barcode_edge
]
self
.
covering_variables
[
edge_idx
]
+=
1
# Special case for previously empty path
if
len
(
self
)
==
0
:
# 4 because it's the ideal case (1 node of difference with same length on 1 shift.
self
.
penalty
=
4
self
.
append
(
udgs
[
0
])
udgs
=
udgs
[
1
:]
# Add udg one by one
for
udg
in
udgs
:
# Compute distance
dist
=
self
.
d2g
[
str
(
udg
.
idx
)][
str
(
self
[
-
1
].
idx
)][
"distance"
]
# Update penalty regarding distance
self
.
penalty
+=
pow
(
dist
,
2
)
# Add the node
self
.
append
(
udg
)
def
normalized_penalty
(
self
):
return
self
.
penalty
/
len
(
self
)
def
append
(
self
,
obj
)
->
None
:
# Transform the input into a lcp
if
type
(
obj
)
==
str
:
obj
=
self
.
d2g
.
node_by_idx
(
int
(
obj
))
elif
type
(
obj
)
==
int
:
obj
=
self
.
d2g
.
node_by_idx
(
obj
)
def
covering_score
(
self
):
covered_num
=
0
for
val
in
self
.
covering_variables
.
values
():
if
val
>
0
:
covered_num
+=
1
lcp
=
obj
# Update the covering variables
for
barcode_edge
in
lcp
.
edges
:
edge_idx
=
self
.
d2g
.
barcode_edge_idxs
[
barcode_edge
]
if
self
.
covering_variables
[
edge_idx
]
==
0
:
self
.
covering_value
+=
1
self
.
covering_variables
[
edge_idx
]
+=
1
super
(
Path
,
self
).
append
(
lcp
)
def
add_path
(
self
,
path
):
for
lcp
in
path
:
self
.
append
(
lcp
)
return
covered_num
/
len
(
self
.
covering_variables
)
def
covering_score
(
self
):
return
self
.
covering_value
/
len
(
self
.
covering_variables
)
""" Count the number of variable covered by path that are not self covered.
"""
...
...
@@ -81,9 +70,6 @@ class Path(list):
nx
.
write_gexf
(
d2p
,
filename
)
# in addition, write the barcode order to a text file
#filename_order_txt = filename[:-len(".gexf")]+".txt"
def
save_path_in_graph
(
self
,
filename
):
d2c
=
self
.
d2g
.
clone
()
for
idx
,
udg
in
enumerate
(
self
):
...
...
deconvolution/d2graph/path_optimization.py
View file @
5fd16e32
...
...
@@ -24,7 +24,7 @@ class Optimizer:
max_cov
=
set
()
for
node
in
self
.
d2g
.
nodes
():
v
=
self
.
d2g
.
get_covering_variable
_node
(
int
(
node
)
)
v
=
self
.
d2g
.
get_covering_variable
s
(
node
)
max_cov
.
update
(
v
)
max_cov
=
len
(
max_cov
)
...
...
@@ -51,7 +51,7 @@ class Optimizer:
used_nodes
[
current_node
]
=
True
current_path
.
append
(
current_node
)
# 2.2 - Compute the coverage score
vars
=
self
.
d2g
.
get_covering_variable
_node
(
int
(
current_node
)
)
vars
=
self
.
d2g
.
get_covering_variable
s
(
current_node
)
coverage
+=
Counter
(
vars
)
# 2.3 - If best, save
if
len
(
coverage
)
>
best_score
:
...
...
@@ -74,7 +74,7 @@ class Optimizer:
def
node_sorting_value
(
node
):
u
=
opt
.
d2g
.
node_by_idx
[
int
(
node
)]
return
(
0
if
len
(
Counter
(
opt
.
d2g
.
get_covering_variable
_node
(
int
(
node
))
)
-
coverage
)
==
0
else
1
,
return
(
0
if
len
(
Counter
(
opt
.
d2g
.
get_covering_variable
s
(
node
))
-
coverage
)
==
0
else
1
,
-
u
.
get_link_divergence
(),
-
self
.
d2g
[
str
(
u
.
idx
)][
str
(
current_node
)][
"distance"
])
neighbors
.
sort
(
key
=
node_sorting_value
)
...
...
@@ -85,7 +85,7 @@ class Optimizer:
stack
.
pop
()
previous
=
current_path
.
pop
()
used_nodes
[
previous
]
=
False
prev_vars
=
self
.
d2g
.
get_covering_variable
_node
(
int
(
previous
)
)
prev_vars
=
self
.
d2g
.
get_covering_variable
s
(
previous
)
coverage
-=
Counter
(
prev_vars
)
# 5 - Reinit the search from a side and stop when done
...
...
@@ -99,11 +99,11 @@ class Optimizer:
current_path
=
[]
if
verbose
:
print
(
"Start again !"
)
import
time
time
.
sleep
(
3
)
first_pass
=
False
total_nb_steps
=
0
nb_steps_since_last_score_up
=
0
import
time
time
.
sleep
(
3
)
else
:
return
best_path
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment