Skip to content
Snippets Groups Projects
Commit b4510e62 authored by Timothe Jost's avatar Timothe Jost
Browse files

test

parent 9303d344
No related branches found
No related tags found
No related merge requests found
Pipeline #127817 passed
__version__ = "0.0.49" __version__ = "0.0.50"
from . import loggs from . import loggs
from .pipes import * from .pipes import *
......
...@@ -471,24 +471,29 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ...@@ -471,24 +471,29 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
# logger.warning(f"Local tasks are : {self.tasks}") # logger.warning(f"Local tasks are : {self.tasks}")
else: else:
if datetime.now() > app_task_data["refresh_time"]: # we refresh if refresh time is elapsed now = datetime.now()
if now > app_task_data["refresh_time"]: # we refresh if refresh time is elapsed
logger.warning(
"Time has come to auto refresh app_task_data. "
f"refresh_time was {app_task_data['refresh_time']} and now is {now}"
)
refresh = True refresh = True
if refresh: if refresh:
try: try:
task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=2) task_data = self.tasks[f"{app_name}.tasks_infos"].delay(app_name).get(timeout=2)
# if the data needs to be refreshed, we don't wait for as long as for a first get of infos. # if the data needs to be refreshed, we don't wait for as long as for a first get of infos.
app_task_data = {"task_data": task_data, "refresh_time": datetime.now() + auto_refresh_time} app_task_data = {"task_data": task_data, "refresh_time": now + auto_refresh_time}
logger.warning("Refreshed celery tasks data sucessfully") logger.warning("Refreshed celery tasks data sucessfully")
except Exception as e: except Exception as e:
logger.warning( logger.warning(
"Could not refresh tasks data from remote celery worker. All workers are is probably running. " "Could not refresh tasks data from remote celery worker. All workers are is probably running. "
f"{e}" f"{e}"
) )
app_task_data["refresh_time"] = datetime.now() + failed_refresh_retry_time app_task_data["refresh_time"] = now + failed_refresh_retry_time
setattr(self, "task_data", app_task_data) setattr(self, "task_data", app_task_data)
else: else:
delta = (app_task_data["refresh_time"] - datetime.now()).total_seconds() delta = (app_task_data["refresh_time"] - now).total_seconds()
logger.warning(f"Returned cached task_data. Next refresh will happen in at least {delta} seconds") logger.warning(f"Returned cached task_data. Next refresh will happen in at least {delta} seconds")
return app_task_data["task_data"] if app_task_data is not None else None return app_task_data["task_data"] if app_task_data is not None else None
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment