From 77af84320ea5d42cbb776716ef67cf161fab030f Mon Sep 17 00:00:00 2001 From: Timothe Jost <timothe.jost@wanadoo.fr> Date: Thu, 4 Apr 2024 23:25:16 +0200 Subject: [PATCH] fixing some errors --- src/pypelines/__init__.py | 2 +- src/pypelines/celery_tasks.py | 21 +++++++++++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index f6c460f..74040b2 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.50" +__version__ = "0.0.51" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index b0f5a17..039cae9 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -443,10 +443,13 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | registered_tasks = self.control.inspect().registered_tasks() workers = [] task_names = [] - for worker, tasks in registered_tasks.items(): - workers.append(worker) - for task in tasks: - task_names.append(task) + if registered_tasks: + for worker, tasks in registered_tasks.items(): + workers.append(worker) + for task in tasks: + task_names.append(task) + + return {"workers": workers, "task_names": task_names} def get_celery_app_tasks(self, refresh=False): @@ -503,6 +506,15 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ) return task_record + def is_hand_shaken(self): + try: + result = self.tasks[f"{app_name}.handshake"].delay(app_name).get(timeout=1) + logger.warning(f"Handshake result : {result}") + return True + except ValueError: + logger.error("No handshake result. All workers are busy ?") + return False + settings_files = get_setting_files_path(conf_path) if len(settings_files) == 0: @@ -563,6 +575,7 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | app.get_remote_tasks = MethodType(get_remote_tasks, app) # type: ignore app.get_celery_app_tasks = MethodType(get_celery_app_tasks, app) # type: ignore app.launch_named_task_remotely = MethodType(launch_named_task_remotely, app) # type: ignore + app.is_hand_shaken = MethodType(is_hand_shaken, app) # type: ignore logger.info(f"The celery app {app_name} was created successfully.") -- GitLab