diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index 19bd5f6d07fcc3df0756589c1b4b1166428911e1..3dd76d2f2f90f157e98a91273ba5866490f477aa 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.52" +__version__ = "0.0.53" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index b974038dd486e4d58b9cf6fa3d547e973b3b42e3..8f7c50d0a421fd9088dee33a2e704d68a70b1ec3 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -451,18 +451,20 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | return {"workers": workers, "task_names": task_names} - def get_celery_app_tasks(self, refresh=False): + def get_celery_app_tasks( + self, refresh=False, auto_refresh=3600 * 24, failed_refresh=60 * 5, initial_timeout=10, refresh_timeout=2 + ): from datetime import datetime, timedelta - auto_refresh_time = timedelta(0, (3600 * 24)) # a full day (24 hours of 3600 seconds) - failed_refresh_retry_time = timedelta(0, (60 * 5)) # try to refresh after 5 minutes + auto_refresh_time = timedelta(0, seconds=auto_refresh) # a full day (24 hours of 3600 seconds) + failed_refresh_retry_time = timedelta(0, failed_refresh) # try to refresh after 5 minutes app_task_data = getattr(self, "task_data", None) if app_task_data is None: try: - task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=10) + task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=initial_timeout) # we set timeout to 10 sec if the task data doesn't exist. # It's long to wait for a webpage to load, but sometimes the workers take time to come out of sleep app_task_data = {"task_data": task_data, "refresh_time": datetime.now() + auto_refresh_time} @@ -484,7 +486,7 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | if refresh: try: - task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=2) + task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=refresh_timeout) # if the data needs to be refreshed, we don't wait for as long as for a first get of infos. app_task_data = {"task_data": task_data, "refresh_time": now + auto_refresh_time} logger.warning("Refreshed celery tasks data sucessfully") @@ -511,8 +513,8 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | result = self.tasks[f"{app_name}.handshake"].delay().get(timeout=1) logger.warning(f"Handshake result : {result}") return True - except ValueError: - logger.error("No handshake result. All workers are busy ?") + except Exception as e: + logger.error(f"No handshake result. All workers are busy ? {e}") return False settings_files = get_setting_files_path(conf_path)