Skip to content
Snippets Groups Projects
Commit 77af8432 authored by Timothe Jost's avatar Timothe Jost
Browse files

fixing some errors

parent b4510e62
No related branches found
No related tags found
No related merge requests found
Pipeline #127820 passed
__version__ = "0.0.50" __version__ = "0.0.51"
from . import loggs from . import loggs
from .pipes import * from .pipes import *
......
...@@ -443,10 +443,13 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ...@@ -443,10 +443,13 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
registered_tasks = self.control.inspect().registered_tasks() registered_tasks = self.control.inspect().registered_tasks()
workers = [] workers = []
task_names = [] task_names = []
for worker, tasks in registered_tasks.items(): if registered_tasks:
workers.append(worker) for worker, tasks in registered_tasks.items():
for task in tasks: workers.append(worker)
task_names.append(task) for task in tasks:
task_names.append(task)
return {"workers": workers, "task_names": task_names}
def get_celery_app_tasks(self, refresh=False): def get_celery_app_tasks(self, refresh=False):
...@@ -503,6 +506,15 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ...@@ -503,6 +506,15 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
) )
return task_record return task_record
def is_hand_shaken(self):
try:
result = self.tasks[f"{app_name}.handshake"].delay(app_name).get(timeout=1)
logger.warning(f"Handshake result : {result}")
return True
except ValueError:
logger.error("No handshake result. All workers are busy ?")
return False
settings_files = get_setting_files_path(conf_path) settings_files = get_setting_files_path(conf_path)
if len(settings_files) == 0: if len(settings_files) == 0:
...@@ -563,6 +575,7 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ...@@ -563,6 +575,7 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
app.get_remote_tasks = MethodType(get_remote_tasks, app) # type: ignore app.get_remote_tasks = MethodType(get_remote_tasks, app) # type: ignore
app.get_celery_app_tasks = MethodType(get_celery_app_tasks, app) # type: ignore app.get_celery_app_tasks = MethodType(get_celery_app_tasks, app) # type: ignore
app.launch_named_task_remotely = MethodType(launch_named_task_remotely, app) # type: ignore app.launch_named_task_remotely = MethodType(launch_named_task_remotely, app) # type: ignore
app.is_hand_shaken = MethodType(is_hand_shaken, app) # type: ignore
logger.info(f"The celery app {app_name} was created successfully.") logger.info(f"The celery app {app_name} was created successfully.")
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment