Skip to content
Snippets Groups Projects
Commit 89d7c261 authored by Timothe Jost's avatar Timothe Jost
Browse files

adding a routine to check runner's arguments and pipeline management argument don't collide

parent 641dc81c
No related branches found
No related tags found
No related merge requests found
Pipeline #127750 passed
__version__ = "0.0.45"
__version__ = "0.0.46"
from . import loggs
from .pipes import *
......
......@@ -51,7 +51,6 @@ class CeleryAlyxTaskManager(BaseStepTaskManager):
try:
session = task.get_session()
application = task.get_application()
arguments = task.arguments
with LogTask(task) as log_object:
logger = log_object.logger
......@@ -63,13 +62,8 @@ class CeleryAlyxTaskManager(BaseStepTaskManager):
step: "BaseStep" = (
application.pipelines[task.pipeline_name].pipes[task.pipe_name].steps[task.step_name]
)
if arguments.get("refresh", False) or arguments.get("refresh_requirements", []):
skip = False
else:
skip = True
arguments.pop(skip)
step.generate(session, extra=extra, skip=skip, check_requirements=True, **task.arguments)
step.generate(session, extra=extra, **task.arguments, **task.management_arguments)
task.status_from_logs(log_object)
except Exception as e:
traceback_msg = format_traceback_exc()
......@@ -157,8 +151,34 @@ class CeleryTaskRecord(dict):
@property
def arguments(self):
# once step arguments control will be done via file, these should take prio over the main step ran's file args
args = self.get("arguments", {})
return args if args else {}
args = args if args else {}
management_args = self.management_arguments
filtered_args = {}
for key, value in args.items():
if key not in management_args.keys():
filtered_args[key] = value
return filtered_args
@property
def management_arguments(self):
default_management_args = {
"skip": True,
"refresh": False,
"refresh_requirements": False,
"check_requirements": True,
"save_output": True,
}
args = self.get("arguments", {})
management_args = {}
for key, default_value in default_management_args.items():
management_args[key] = args.get(key, default_value)
if management_args["refresh"] == True:
management_args["skip"] = False
return management_args
@property
def session_path(self) -> str:
......@@ -310,12 +330,24 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
files.append(file_loc)
return files
def get_signature_string(signature):
def get_signature_as_string(signature):
params = [
param_value for param_name, param_value in signature.parameters.items() if param_name not in ["session"]
]
return str(signature.replace(parameters=params))[1:-1].replace(" *,", "")
def get_signature_as_dict(signature_string):
from re import compile as re_compile
from ast import literal_eval
signature_pattern = re_compile(r" *(?P<key> *\w+) *= *(?P<value>.*?) *(?=(?:(?:, *\w+ *=)|(?:$)))")
patt = signature_pattern.findall(signature_string)
data = {}
for key, value in patt:
t_val = literal_eval(value)
data[key] = t_val
return data
class Handshake(Task):
name = f"{app_name}.handshake"
......@@ -340,10 +372,12 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
for pipe in pipeline.pipes.values():
for step in pipe.steps.values():
if step.complete_name in app.tasks.keys():
sig = get_signature_string(step.generate.__signature__)
str_sig = get_signature_as_string(step.generate.__signature__)
dict_sig = get_signature_as_dict(str_sig)
doc = step.generate.__doc__
task_data = {
"signature": sig,
"signature": str_sig,
"signature_dict": dict_sig,
"docstring": doc,
"step_name": step.step_name,
"pipe_name": step.pipe_name,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment