Skip to content
Snippets Groups Projects
Commit 56e60eb7 authored by Timothe Jost's avatar Timothe Jost
Browse files

fixin an issue in is_hand_shaken and adding options to get_celery_app_tasks

parent 29698c1a
No related branches found
No related tags found
No related merge requests found
Pipeline #127822 passed
__version__ = "0.0.52" __version__ = "0.0.53"
from . import loggs from . import loggs
from .pipes import * from .pipes import *
......
...@@ -451,18 +451,20 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ...@@ -451,18 +451,20 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
return {"workers": workers, "task_names": task_names} return {"workers": workers, "task_names": task_names}
def get_celery_app_tasks(self, refresh=False): def get_celery_app_tasks(
self, refresh=False, auto_refresh=3600 * 24, failed_refresh=60 * 5, initial_timeout=10, refresh_timeout=2
):
from datetime import datetime, timedelta from datetime import datetime, timedelta
auto_refresh_time = timedelta(0, (3600 * 24)) # a full day (24 hours of 3600 seconds) auto_refresh_time = timedelta(0, seconds=auto_refresh) # a full day (24 hours of 3600 seconds)
failed_refresh_retry_time = timedelta(0, (60 * 5)) # try to refresh after 5 minutes failed_refresh_retry_time = timedelta(0, failed_refresh) # try to refresh after 5 minutes
app_task_data = getattr(self, "task_data", None) app_task_data = getattr(self, "task_data", None)
if app_task_data is None: if app_task_data is None:
try: try:
task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=10) task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=initial_timeout)
# we set timeout to 10 sec if the task data doesn't exist. # we set timeout to 10 sec if the task data doesn't exist.
# It's long to wait for a webpage to load, but sometimes the workers take time to come out of sleep # It's long to wait for a webpage to load, but sometimes the workers take time to come out of sleep
app_task_data = {"task_data": task_data, "refresh_time": datetime.now() + auto_refresh_time} app_task_data = {"task_data": task_data, "refresh_time": datetime.now() + auto_refresh_time}
...@@ -484,7 +486,7 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ...@@ -484,7 +486,7 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
if refresh: if refresh:
try: try:
task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=2) task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=refresh_timeout)
# if the data needs to be refreshed, we don't wait for as long as for a first get of infos. # if the data needs to be refreshed, we don't wait for as long as for a first get of infos.
app_task_data = {"task_data": task_data, "refresh_time": now + auto_refresh_time} app_task_data = {"task_data": task_data, "refresh_time": now + auto_refresh_time}
logger.warning("Refreshed celery tasks data sucessfully") logger.warning("Refreshed celery tasks data sucessfully")
...@@ -511,8 +513,8 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ...@@ -511,8 +513,8 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
result = self.tasks[f"{app_name}.handshake"].delay().get(timeout=1) result = self.tasks[f"{app_name}.handshake"].delay().get(timeout=1)
logger.warning(f"Handshake result : {result}") logger.warning(f"Handshake result : {result}")
return True return True
except ValueError: except Exception as e:
logger.error("No handshake result. All workers are busy ?") logger.error(f"No handshake result. All workers are busy ? {e}")
return False return False
settings_files = get_setting_files_path(conf_path) settings_files = get_setting_files_path(conf_path)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment