diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index f6c460f72aa9ba49742ab654db6844f14611bfa1..74040b269ae91bb8f57b838deedcfa0ebfe86d58 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.50" +__version__ = "0.0.51" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index b0f5a17e90a8e392da1ec26fefde1274e2352880..039cae9b9c0ad6fc42175efda44e9adfbe00d6a3 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -443,10 +443,13 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | registered_tasks = self.control.inspect().registered_tasks() workers = [] task_names = [] - for worker, tasks in registered_tasks.items(): - workers.append(worker) - for task in tasks: - task_names.append(task) + if registered_tasks: + for worker, tasks in registered_tasks.items(): + workers.append(worker) + for task in tasks: + task_names.append(task) + + return {"workers": workers, "task_names": task_names} def get_celery_app_tasks(self, refresh=False): @@ -503,6 +506,15 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ) return task_record + def is_hand_shaken(self): + try: + result = self.tasks[f"{app_name}.handshake"].delay(app_name).get(timeout=1) + logger.warning(f"Handshake result : {result}") + return True + except ValueError: + logger.error("No handshake result. All workers are busy ?") + return False + settings_files = get_setting_files_path(conf_path) if len(settings_files) == 0: @@ -563,6 +575,7 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | app.get_remote_tasks = MethodType(get_remote_tasks, app) # type: ignore app.get_celery_app_tasks = MethodType(get_celery_app_tasks, app) # type: ignore app.launch_named_task_remotely = MethodType(launch_named_task_remotely, app) # type: ignore + app.is_hand_shaken = MethodType(is_hand_shaken, app) # type: ignore logger.info(f"The celery app {app_name} was created successfully.")