From 56e60eb720b68f46bc04a68105b7994041020e63 Mon Sep 17 00:00:00 2001 From: Timothe Jost <timothe.jost@wanadoo.fr> Date: Thu, 4 Apr 2024 23:45:06 +0200 Subject: [PATCH] fixin an issue in is_hand_shaken and adding options to get_celery_app_tasks --- src/pypelines/__init__.py | 2 +- src/pypelines/celery_tasks.py | 16 +++++++++------- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index 19bd5f6..3dd76d2 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.52" +__version__ = "0.0.53" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index b974038..8f7c50d 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -451,18 +451,20 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | return {"workers": workers, "task_names": task_names} - def get_celery_app_tasks(self, refresh=False): + def get_celery_app_tasks( + self, refresh=False, auto_refresh=3600 * 24, failed_refresh=60 * 5, initial_timeout=10, refresh_timeout=2 + ): from datetime import datetime, timedelta - auto_refresh_time = timedelta(0, (3600 * 24)) # a full day (24 hours of 3600 seconds) - failed_refresh_retry_time = timedelta(0, (60 * 5)) # try to refresh after 5 minutes + auto_refresh_time = timedelta(0, seconds=auto_refresh) # a full day (24 hours of 3600 seconds) + failed_refresh_retry_time = timedelta(0, failed_refresh) # try to refresh after 5 minutes app_task_data = getattr(self, "task_data", None) if app_task_data is None: try: - task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=10) + task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=initial_timeout) # we set timeout to 10 sec if the task data doesn't exist. # It's long to wait for a webpage to load, but sometimes the workers take time to come out of sleep app_task_data = {"task_data": task_data, "refresh_time": datetime.now() + auto_refresh_time} @@ -484,7 +486,7 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | if refresh: try: - task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=2) + task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=refresh_timeout) # if the data needs to be refreshed, we don't wait for as long as for a first get of infos. app_task_data = {"task_data": task_data, "refresh_time": now + auto_refresh_time} logger.warning("Refreshed celery tasks data sucessfully") @@ -511,8 +513,8 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | result = self.tasks[f"{app_name}.handshake"].delay().get(timeout=1) logger.warning(f"Handshake result : {result}") return True - except ValueError: - logger.error("No handshake result. All workers are busy ?") + except Exception as e: + logger.error(f"No handshake result. All workers are busy ? {e}") return False settings_files = get_setting_files_path(conf_path) -- GitLab