From ef8b1536af9da50178f935816b9665ae19261d39 Mon Sep 17 00:00:00 2001 From: Timothe Jost <timothe.jost@wanadoo.fr> Date: Wed, 27 Mar 2024 21:22:14 +0100 Subject: [PATCH] updating the way to correctly register CeleryRunner task --- src/pypelines/__init__.py | 2 +- src/pypelines/celery_tasks.py | 73 +++++++++++++++++++---------------- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index 4f62a7d..b3776cb 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.34" +__version__ = "0.0.35" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index 06a3b3a..2f19d98 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -20,37 +20,47 @@ if TYPE_CHECKING: APPLICATIONS_STORE = {} -def CeleryRunner(task_id, extra=None): +def get_runner(task_name: str): + from celery import Task - task = CeleryTaskRecord(task_id) + class CeleryRunner(Task): + name = task_name - try: - session = task.get_session() - application = task.get_application() + def run(self, task_id, extra=None): - with LogTask(task) as log_object: - logger = log_object.logger - task["log"] = log_object.filename - task["status"] = "Started" - task.partial_update() + task = CeleryTaskRecord(task_id) try: - step: "BaseStep" = application.pipelines[task.pipeline_name].pipes[task.pipe_name].steps[task.step_name] - step.generate(session, extra=extra, skip=True, check_requirements=True, **task.arguments) - task.status_from_logs(log_object) + session = task.get_session() + application = task.get_application() + + with LogTask(task) as log_object: + logger = log_object.logger + task["log"] = log_object.filename + task["status"] = "Started" + task.partial_update() + + try: + step: "BaseStep" = ( + application.pipelines[task.pipeline_name].pipes[task.pipe_name].steps[task.step_name] + ) + step.generate(session, extra=extra, skip=True, check_requirements=True, **task.arguments) + task.status_from_logs(log_object) + except Exception as e: + traceback_msg = format_traceback_exc() + logger.critical(f"Fatal Error : {e}") + logger.critical("Traceback :\n" + traceback_msg) + task["status"] = "Failed" + except Exception as e: - traceback_msg = format_traceback_exc() - logger.critical(f"Fatal Error : {e}") - logger.critical("Traceback :\n" + traceback_msg) - task["status"] = "Failed" + # if it fails outside of the nested try statement, we can't store logs files, + # and we mention the failure through alyx directly. + task["status"] = "Uncatched_Fail" + task["log"] = str(e) - except Exception as e: - # if it fails outside of the nested try statement, we can't store logs files, - # and we mention the failure through alyx directly. - task["status"] = "Uncatched_Fail" - task["log"] = str(e) + task.partial_update() - task.partial_update() + return CeleryRunner class CeleryAlyxTaskManager(BaseStepTaskManager): @@ -60,7 +70,8 @@ class CeleryAlyxTaskManager(BaseStepTaskManager): def register_step(self): if self.backend: - self.backend.app.task(CeleryRunner, name=self.step.complete_name) + # self.backend.app.task(CeleryRunner, name=self.step.complete_name) + self.backend.app.register_task(get_runner(self.step.complete_name)) def start(self, session, extra=None, **kwargs): @@ -339,16 +350,10 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery": APPLICATIONS_STORE[app_name] = app - return app + app.register_task(handshake) + return app -# def create_worker_for_app(app_name): -# from celery.bin.worker import worker as celery_worker -# def start_worker(app): -# worker = celery_worker(app=app) -# options = { -# "loglevel": "INFO", -# "traceback": True, -# } -# worker.run(**options) +def handshake(): + return f"{node()} is happy to shake your hand and says hello !" -- GitLab