diff --git a/PanACoTA/annotate_module/prokka_prodigal_functions.py b/PanACoTA/annotate_module/prokka_prodigal_functions.py index 58bc450da172df8d18b080e853ffdabac65007c4..21e1169b66aaad2732035085b71c3dbe1b7c5a1e 100755 --- a/PanACoTA/annotate_module/prokka_prodigal_functions.py +++ b/PanACoTA/annotate_module/prokka_prodigal_functions.py @@ -51,8 +51,9 @@ def run_annotation_all(genomes, threads, force, annot_folder, prodigal_only=Fals ------- dict {genome: boolean} -> with True if prokka/prodigal ran well, False otherwise. - # """ + """ + # Update information according to annotation soft used and write message if prodigal_only: message = "Annotating all genomes with prodigal" run_annot = run_prodigal @@ -62,16 +63,21 @@ def run_annotation_all(genomes, threads, force, annot_folder, prodigal_only=Fals run_annot = run_prokka main_logger = logging.getLogger("qc_annotate.prokka") main_logger.info(message) + # Get total number of genomes to annotate, used to show annotation progress nbgen = len(genomes) bar = None + # If user did not ask for quiet, create progressbar if not quiet: - # Create progressbar + # Create progress bar widgets = ['Annotation: ', progressbar.Bar(marker='█', left='', right=''), ' ', progressbar.Counter(), "/{}".format(nbgen), ' (', progressbar.Percentage(), ') - ', progressbar.Timer(), ' - ' ] bar = progressbar.ProgressBar(widgets=widgets, max_value=nbgen, term_width=100).start() + # Get resource availability: + # - number of threads used by prokka/prodigal (cores_annot) + # - how many genomes can be annotated at the same time (pool_size) if threads <= 3: cores_annot = threads pool_size = 1 @@ -84,33 +90,60 @@ def run_annotation_all(genomes, threads, force, annot_folder, prodigal_only=Fals else: cores_annot = 2 pool_size = int(threads / cores_annot) + # Create pool with a given size (=number of tasks to be launched in parallel) pool = multiprocessing.Pool(pool_size) # Create a Queue to put logs from processes, and handle them after from a single thread m = multiprocessing.Manager() q = m.Queue() - # {genome: [name, gpath_cut_gembase, size, nbcont, l90]} - # arguments : (gpath, annot_folder, threads, name, force, nbcont, q) for each genome - arguments = [(genomes[g][1], annot_folder, cores_annot, genomes[g][0], - force, genomes[g][3], small, q) + # {genome: [gembase_name, path_to_origfile, path_toannotate_file, gsize, nbcont, L90]} + # arguments: gpath, prok_folder, threads, name, force, nbcont, small(for prodigal), q + arguments = [(genomes[g][2], annot_folder, cores_annot, genomes[g][0], + force, genomes[g][4], small, q) for g in sorted(genomes)] try: + # Start pool (run 'run_annot' n each set of arguments) final = pool.map_async(run_annot, arguments, chunksize=1) + # Close pool: no more data will be put on this pool pool.close() # Listen for logs in processes lp = threading.Thread(target=utils.logger_thread, args=(q,)) lp.start() if not quiet: while True: + # Final is ready when all pool elements are done if final.ready(): break + # If not done, get number of genomes left remaining = final._number_left + # Update progress bar bar.update(nbgen - remaining) + # End progress bar bar.finish() pool.join() + # Put None to tell 'q' that everything is finished. It can stopped and be joined. q.put(None) + # join lp (tell to stop once all log processes are done, which is the case here) lp.join() final = final.get() - # If an error occurs, terminate pool and exit + + # # If user stops programm (ctrl+C), end it + # except KeyboardInterrupt as ki: + # print("error") + # for worker in pool._pool: + # print(worker.is_alive()) + # pool.join() + # print("closed") + # pool.terminate() + # print("--------------terminate ok----------------") + # lp.join() + # print("thread stopped") + # # run_event.clear() + # # lp.terminate() + # # print("--------------JOIN--------------") + # # pool.terminate() + # main_logger.error("Process killed by CTRL+C") + # return "coucou" + # If an error occurs, terminate pool, write error and exit except Exception as excp: # pragma: no cover pool.terminate() main_logger.error(excp) @@ -153,10 +186,13 @@ def run_prokka(arguments): root.addHandler(qh) logger = logging.getLogger('run_prokka') logger.log(utils.detail_lvl(), "Start annotating {} {} with Prokka".format(name, gpath)) + # Define prokka directory and logfile, and check their existence prok_dir = os.path.join(prok_folder, os.path.basename(gpath) + "-prokkaRes") fnull = open(os.devnull, 'w') prok_logfile = os.path.join(prok_folder, os.path.basename(gpath) + "-prokka.log") + # import sys + # sys.exit(1) # If result dir already exists, check if we can use it or next step or not if os.path.isdir(prok_dir) and not force: logger.warning(("Prokka results folder already exists.").format(prok_dir))