Skip to content
Snippets Groups Projects
Commit cb54f364 authored by Timothe Jost's avatar Timothe Jost
Browse files

moving start remotely to steps

parent 72a7f034
Branches
Tags
No related merge requests found
...@@ -15,7 +15,7 @@ class Pipeline: ...@@ -15,7 +15,7 @@ class Pipeline:
self.pipeline_name = name self.pipeline_name = name
self.pipes = {} self.pipes = {}
self.resolved = False self.resolved = False
self.conf_path = os.path.basename(conf_path) if conf_path is not None else None self.conf_path = os.path.dirname(conf_path) if conf_path is not None else None
if use_celery: if use_celery:
self.configure_celery() self.configure_celery()
......
...@@ -411,6 +411,31 @@ class BaseStep: ...@@ -411,6 +411,31 @@ class BaseStep:
def get_arguments(self, session): def get_arguments(self, session):
... ...
def start_remotely(self, session, extra=None, **kwargs):
if not self.pipeline.use_celery:
raise NotImplementedError(
"Cannot use this feature with a pipeline that doesn't have a celery cluster access"
)
from one import ONE
connector = ONE(mode="remote", data_access_mode="remote")
worker = self.pipeline.celery.app.tasks[self.full_name]
task = connector.alyx.rest(
"tasks",
"create",
data={
"session": session.name,
"name": self.full_name,
"arguments": kwargs,
"status": "Waiting",
"executable": self.pipeline.celery.app_name,
},
)
worker(task["id"], extra=extra)
return task
@dataclass @dataclass
class StepLevel: class StepLevel:
......
...@@ -17,7 +17,6 @@ class CeleryHandler: ...@@ -17,7 +17,6 @@ class CeleryHandler:
settings_files = self.get_setting_files_path(conf_path, pipeline_name) settings_files = self.get_setting_files_path(conf_path, pipeline_name)
self.settings = Dynaconf(settings_files=settings_files) self.settings = Dynaconf(settings_files=settings_files)
self.app_name = self.settings.get("app_name", pipeline_name) self.app_name = self.settings.get("app_name", pipeline_name)
print(self.settings)
self.app = Celery( self.app = Celery(
self.app_name, self.app_name,
broker=( broker=(
...@@ -94,35 +93,6 @@ class CeleryHandler: ...@@ -94,35 +93,6 @@ class CeleryHandler:
else: else:
return "Complete" return "Complete"
def start_step_remotely(self, step, session, extra=None, **kwargs):
if isinstance(step, str):
worker = self.app.tasks[step] # step is a step name
name = step
else:
worker = self.app.tasks[step.full_name]
name = step.full_name
from one import ONE
connector = ONE(mode="remote", data_access_mode="remote")
task = TaskRecord(
connector.alyx.rest(
"tasks",
"create",
data={
"session": session.name,
"name": name,
"arguments": kwargs,
"status": "Waiting",
"executable": self.app_name,
},
)
)
worker(task.id, extra=extra)
return task
class TaskRecord(dict): class TaskRecord(dict):
# a class to make dictionnary keys accessible with attribute syntax # a class to make dictionnary keys accessible with attribute syntax
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment