Skip to content
Snippets Groups Projects
Commit ef8b1536 authored by Timothe Jost's avatar Timothe Jost
Browse files

updating the way to correctly register CeleryRunner task

parent 014365fa
No related branches found
No related tags found
No related merge requests found
Pipeline #127176 passed
__version__ = "0.0.34" __version__ = "0.0.35"
from . import loggs from . import loggs
from .pipes import * from .pipes import *
......
...@@ -20,37 +20,47 @@ if TYPE_CHECKING: ...@@ -20,37 +20,47 @@ if TYPE_CHECKING:
APPLICATIONS_STORE = {} APPLICATIONS_STORE = {}
def CeleryRunner(task_id, extra=None): def get_runner(task_name: str):
from celery import Task
task = CeleryTaskRecord(task_id) class CeleryRunner(Task):
name = task_name
try: def run(self, task_id, extra=None):
session = task.get_session()
application = task.get_application()
with LogTask(task) as log_object: task = CeleryTaskRecord(task_id)
logger = log_object.logger
task["log"] = log_object.filename
task["status"] = "Started"
task.partial_update()
try: try:
step: "BaseStep" = application.pipelines[task.pipeline_name].pipes[task.pipe_name].steps[task.step_name] session = task.get_session()
step.generate(session, extra=extra, skip=True, check_requirements=True, **task.arguments) application = task.get_application()
task.status_from_logs(log_object)
with LogTask(task) as log_object:
logger = log_object.logger
task["log"] = log_object.filename
task["status"] = "Started"
task.partial_update()
try:
step: "BaseStep" = (
application.pipelines[task.pipeline_name].pipes[task.pipe_name].steps[task.step_name]
)
step.generate(session, extra=extra, skip=True, check_requirements=True, **task.arguments)
task.status_from_logs(log_object)
except Exception as e:
traceback_msg = format_traceback_exc()
logger.critical(f"Fatal Error : {e}")
logger.critical("Traceback :\n" + traceback_msg)
task["status"] = "Failed"
except Exception as e: except Exception as e:
traceback_msg = format_traceback_exc() # if it fails outside of the nested try statement, we can't store logs files,
logger.critical(f"Fatal Error : {e}") # and we mention the failure through alyx directly.
logger.critical("Traceback :\n" + traceback_msg) task["status"] = "Uncatched_Fail"
task["status"] = "Failed" task["log"] = str(e)
except Exception as e: task.partial_update()
# if it fails outside of the nested try statement, we can't store logs files,
# and we mention the failure through alyx directly.
task["status"] = "Uncatched_Fail"
task["log"] = str(e)
task.partial_update() return CeleryRunner
class CeleryAlyxTaskManager(BaseStepTaskManager): class CeleryAlyxTaskManager(BaseStepTaskManager):
...@@ -60,7 +70,8 @@ class CeleryAlyxTaskManager(BaseStepTaskManager): ...@@ -60,7 +70,8 @@ class CeleryAlyxTaskManager(BaseStepTaskManager):
def register_step(self): def register_step(self):
if self.backend: if self.backend:
self.backend.app.task(CeleryRunner, name=self.step.complete_name) # self.backend.app.task(CeleryRunner, name=self.step.complete_name)
self.backend.app.register_task(get_runner(self.step.complete_name))
def start(self, session, extra=None, **kwargs): def start(self, session, extra=None, **kwargs):
...@@ -339,16 +350,10 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery": ...@@ -339,16 +350,10 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery":
APPLICATIONS_STORE[app_name] = app APPLICATIONS_STORE[app_name] = app
return app app.register_task(handshake)
return app
# def create_worker_for_app(app_name):
# from celery.bin.worker import worker as celery_worker
# def start_worker(app): def handshake():
# worker = celery_worker(app=app) return f"{node()} is happy to shake your hand and says hello !"
# options = {
# "loglevel": "INFO",
# "traceback": True,
# }
# worker.run(**options)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment