From 7eff7c24687dde8125f23a51a7739fec23b05b72 Mon Sep 17 00:00:00 2001
From: Amandine PERRIN <amandine.perrin@pasteur.fr>
Date: Mon, 5 Aug 2019 15:17:01 +0200
Subject: [PATCH] More comments on pool for annotation step
---
.../prokka_prodigal_functions.py | 50 ++++++++++++++++---
1 file changed, 43 insertions(+), 7 deletions(-)
diff --git a/PanACoTA/annotate_module/prokka_prodigal_functions.py b/PanACoTA/annotate_module/prokka_prodigal_functions.py
index 58bc450d..21e1169b 100755
--- a/PanACoTA/annotate_module/prokka_prodigal_functions.py
+++ b/PanACoTA/annotate_module/prokka_prodigal_functions.py
@@ -51,8 +51,9 @@ def run_annotation_all(genomes, threads, force, annot_folder, prodigal_only=Fals
-------
dict
{genome: boolean} -> with True if prokka/prodigal ran well, False otherwise.
- # """
+ """
+ # Update information according to annotation soft used and write message
if prodigal_only:
message = "Annotating all genomes with prodigal"
run_annot = run_prodigal
@@ -62,16 +63,21 @@ def run_annotation_all(genomes, threads, force, annot_folder, prodigal_only=Fals
run_annot = run_prokka
main_logger = logging.getLogger("qc_annotate.prokka")
main_logger.info(message)
+ # Get total number of genomes to annotate, used to show annotation progress
nbgen = len(genomes)
bar = None
+ # If user did not ask for quiet, create progressbar
if not quiet:
- # Create progressbar
+ # Create progress bar
widgets = ['Annotation: ', progressbar.Bar(marker='█', left='', right=''),
' ', progressbar.Counter(), "/{}".format(nbgen), ' (',
progressbar.Percentage(), ') - ', progressbar.Timer(), ' - '
]
bar = progressbar.ProgressBar(widgets=widgets, max_value=nbgen,
term_width=100).start()
+ # Get resource availability:
+ # - number of threads used by prokka/prodigal (cores_annot)
+ # - how many genomes can be annotated at the same time (pool_size)
if threads <= 3:
cores_annot = threads
pool_size = 1
@@ -84,33 +90,60 @@ def run_annotation_all(genomes, threads, force, annot_folder, prodigal_only=Fals
else:
cores_annot = 2
pool_size = int(threads / cores_annot)
+ # Create pool with a given size (=number of tasks to be launched in parallel)
pool = multiprocessing.Pool(pool_size)
# Create a Queue to put logs from processes, and handle them after from a single thread
m = multiprocessing.Manager()
q = m.Queue()
- # {genome: [name, gpath_cut_gembase, size, nbcont, l90]}
- # arguments : (gpath, annot_folder, threads, name, force, nbcont, q) for each genome
- arguments = [(genomes[g][1], annot_folder, cores_annot, genomes[g][0],
- force, genomes[g][3], small, q)
+ # {genome: [gembase_name, path_to_origfile, path_toannotate_file, gsize, nbcont, L90]}
+ # arguments: gpath, prok_folder, threads, name, force, nbcont, small(for prodigal), q
+ arguments = [(genomes[g][2], annot_folder, cores_annot, genomes[g][0],
+ force, genomes[g][4], small, q)
for g in sorted(genomes)]
try:
+ # Start pool (run 'run_annot' n each set of arguments)
final = pool.map_async(run_annot, arguments, chunksize=1)
+ # Close pool: no more data will be put on this pool
pool.close()
# Listen for logs in processes
lp = threading.Thread(target=utils.logger_thread, args=(q,))
lp.start()
if not quiet:
while True:
+ # Final is ready when all pool elements are done
if final.ready():
break
+ # If not done, get number of genomes left
remaining = final._number_left
+ # Update progress bar
bar.update(nbgen - remaining)
+ # End progress bar
bar.finish()
pool.join()
+ # Put None to tell 'q' that everything is finished. It can stopped and be joined.
q.put(None)
+ # join lp (tell to stop once all log processes are done, which is the case here)
lp.join()
final = final.get()
- # If an error occurs, terminate pool and exit
+
+ # # If user stops programm (ctrl+C), end it
+ # except KeyboardInterrupt as ki:
+ # print("error")
+ # for worker in pool._pool:
+ # print(worker.is_alive())
+ # pool.join()
+ # print("closed")
+ # pool.terminate()
+ # print("--------------terminate ok----------------")
+ # lp.join()
+ # print("thread stopped")
+ # # run_event.clear()
+ # # lp.terminate()
+ # # print("--------------JOIN--------------")
+ # # pool.terminate()
+ # main_logger.error("Process killed by CTRL+C")
+ # return "coucou"
+ # If an error occurs, terminate pool, write error and exit
except Exception as excp: # pragma: no cover
pool.terminate()
main_logger.error(excp)
@@ -153,10 +186,13 @@ def run_prokka(arguments):
root.addHandler(qh)
logger = logging.getLogger('run_prokka')
logger.log(utils.detail_lvl(), "Start annotating {} {} with Prokka".format(name, gpath))
+
# Define prokka directory and logfile, and check their existence
prok_dir = os.path.join(prok_folder, os.path.basename(gpath) + "-prokkaRes")
fnull = open(os.devnull, 'w')
prok_logfile = os.path.join(prok_folder, os.path.basename(gpath) + "-prokka.log")
+ # import sys
+ # sys.exit(1)
# If result dir already exists, check if we can use it or next step or not
if os.path.isdir(prok_dir) and not force:
logger.warning(("Prokka results folder already exists.").format(prok_dir))
--
GitLab