Skip to content
GitLab
Projects
Groups
Snippets
/
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
Yoann DUFRESNE
linked reads molecule ordering
Commits
99fef3be
Commit
99fef3be
authored
May 04, 2020
by
rchikhi
Browse files
evaluation script of molecule-generated d2 graphs
parent
a304859c
Changes
1
Hide whitespace changes
Inline
Side-by-side
deconvolution/main/d2_path_evaluation.py
0 → 100755
View file @
99fef3be
#!/usr/bin/env python3
"""
purpose:
take a D2 graph generated from a graph where we know the molecule positions on the genome
compute quality metrics on this d2 graph
"""
import
sys
import
argparse
from
termcolor
import
colored
import
networkx
as
nx
def
parse_args
():
parser
=
argparse
.
ArgumentParser
(
description
=
'Process some integers.'
)
parser
.
add_argument
(
'filename'
,
type
=
str
,
help
=
'The file to evalute'
)
args
=
parser
.
parse_args
()
return
args
def
load_graph
(
filename
):
if
filename
.
endswith
(
'.graphml'
):
return
nx
.
read_graphml
(
filename
)
elif
filename
.
endswith
(
'.gexf'
):
return
nx
.
read_gexf
(
filename
)
else
:
print
(
"Wrong file format. Require graphml or gefx format"
,
file
=
sys
.
stderr
)
exit
()
import
random
def
findRandomPath
(
G
,
u
,
n
):
if
n
==
0
:
return
[
u
]
path
=
[
u
]
poss_neigh
=
list
(
G
.
neighbors
(
u
))
while
u
in
path
:
if
len
(
poss_neigh
)
==
0
:
return
None
neighbor
=
random
.
choice
(
poss_neigh
)
poss_neigh
.
remove
(
neighbor
)
path
=
findRandomPath
(
G
,
neighbor
,
n
-
1
)
if
path
is
None
:
return
None
return
[
u
]
+
path
import
itertools
def
is_there_path
(
central_nodes
,
overlap_length
):
for
mols
in
itertools
.
product
(
*
central_nodes
):
#print(mols)
last_end
=
None
good_path
=
True
for
mol
in
mols
:
start
,
end
=
mol
if
last_end
is
None
:
last_end
=
end
else
:
if
start
>
last_end
-
overlap_length
:
#print("bad path",mols)
good_path
=
False
break
if
good_path
:
return
True
return
False
def
central_node_to_molecules
(
nodestr
):
# format for a 2-merge: 1:NC_000913.3_298281_313280_0:0:0_0:0:0_2fb/1_NC_000913.3_338611_353610_0:0:0_0:0:0_37b/1
cur_node_mols
=
[]
for
ide
in
nodestr
.
split
(
'NC_'
)[
1
:]:
# specific to e coli
#print(ide)
x
=
ide
.
split
(
"_"
)
start
,
end
=
int
(
x
[
1
]),
int
(
x
[
2
])
cur_node_mols
+=
[(
start
,
end
)]
return
cur_node_mols
def
is_coherent_path
(
central_nodes
,
overlap_length
):
mols
=
[]
for
node
in
central_nodes
:
cur_node_mols
=
central_node_to_molecules
(
node
)
mols
+=
[
cur_node_mols
]
return
is_there_path
(
mols
,
overlap_length
)
graph
=
None
def
evaluate_accuracy_paths
(
path_len
,
overlap_length
=
7000
,
max_paths_per_node
=
100
):
global
graph
nb_bad_paths
=
0
nb_good_paths
=
0
for
node
in
graph
.
nodes
():
nb_paths
=
0
for
_
in
range
(
max_paths_per_node
):
path
=
findRandomPath
(
graph
,
node
,
path_len
)
if
path
is
None
:
continue
#print("path",path)
central_nodes
=
[
graph
.
nodes
[
x
][
'udg'
].
split
()[
0
]
for
x
in
path
]
#print(path,central_nodes)
if
is_coherent_path
(
central_nodes
,
overlap_length
):
nb_good_paths
+=
1
else
:
nb_bad_paths
+=
1
print
(
"accuracy for l=%d:"
%
path_len
,
nb_good_paths
/
(
nb_good_paths
+
nb_bad_paths
))
def
is_there_path
(
graph
,
molecules_to_nodes
,
sought_path
):
possible_central_nodes
=
[]
for
mol
in
sought_path
:
possible_central_nodes
+=
[
molecules_to_nodes
[
mol
]]
#print(possible_central_nodes)
for
mols
in
itertools
.
product
(
*
possible_central_nodes
):
if
nx
.
is_connected
(
graph
.
subgraph
(
mols
)):
#print("found connected path",mols)
return
True
print
(
"found no connected paths"
,
sought_path
)
return
False
def
evaluate_sensitivity_paths
(
path_len
,
overlap_length
=
7000
):
all_molecules
=
set
()
from
collections
import
defaultdict
molecules_to_nodes
=
defaultdict
(
list
)
for
node
in
graph
.
nodes
():
central_node
=
graph
.
nodes
[
node
][
'udg'
].
split
()[
0
]
mols
=
central_node_to_molecules
(
central_node
)
all_molecules
|=
set
(
mols
)
for
mol
in
mols
:
molecules_to_nodes
[
mol
]
+=
[
node
]
all_molecules
=
sorted
(
all_molecules
)
print
(
"got"
,
len
(
all_molecules
),
"molecules total"
)
nb_found
,
nb_not_found
=
0
,
0
for
i
in
range
(
len
(
all_molecules
)
-
path_len
+
1
):
sought_path
=
all_molecules
[
i
:
i
+
path_len
]
if
is_there_path
(
graph
,
molecules_to_nodes
,
sought_path
):
nb_found
+=
1
else
:
nb_not_found
+=
1
print
(
"sensitivity for l=%d"
%
path_len
,
nb_found
/
(
nb_not_found
+
nb_found
))
from
multiprocessing
import
Pool
def
main
():
global
graph
args
=
parse_args
()
graph
=
load_graph
(
args
.
filename
)
p
=
Pool
(
4
)
#p.map(evaluate_accuracy_paths, [1,2,3,4])
p
.
map
(
evaluate_sensitivity_paths
,
[
1
,
2
,
3
,
4
])
if
__name__
==
"__main__"
:
main
()
Write
Preview
Supports
Markdown
0%
Try again
or
attach a new file
.
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment