diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index 93e24e38b05bf69e957e2f5ec6229110d316af54..be1301e0c8cda885825a9623c818d38e2f5c6d52 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.45" +__version__ = "0.0.46" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index bd5059ecb356e74fd9be6a2a9394b559ba9781bd..516544d43224a27632d5ed2c203f42dd304a681c 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -51,7 +51,6 @@ class CeleryAlyxTaskManager(BaseStepTaskManager): try: session = task.get_session() application = task.get_application() - arguments = task.arguments with LogTask(task) as log_object: logger = log_object.logger @@ -63,13 +62,8 @@ class CeleryAlyxTaskManager(BaseStepTaskManager): step: "BaseStep" = ( application.pipelines[task.pipeline_name].pipes[task.pipe_name].steps[task.step_name] ) - if arguments.get("refresh", False) or arguments.get("refresh_requirements", []): - skip = False - else: - skip = True - arguments.pop(skip) - step.generate(session, extra=extra, skip=skip, check_requirements=True, **task.arguments) + step.generate(session, extra=extra, **task.arguments, **task.management_arguments) task.status_from_logs(log_object) except Exception as e: traceback_msg = format_traceback_exc() @@ -157,8 +151,34 @@ class CeleryTaskRecord(dict): @property def arguments(self): + # once step arguments control will be done via file, these should take prio over the main step ran's file args args = self.get("arguments", {}) - return args if args else {} + args = args if args else {} + management_args = self.management_arguments + filtered_args = {} + for key, value in args.items(): + if key not in management_args.keys(): + filtered_args[key] = value + return filtered_args + + @property + def management_arguments(self): + default_management_args = { + "skip": True, + "refresh": False, + "refresh_requirements": False, + "check_requirements": True, + "save_output": True, + } + args = self.get("arguments", {}) + management_args = {} + for key, default_value in default_management_args.items(): + management_args[key] = args.get(key, default_value) + + if management_args["refresh"] == True: + management_args["skip"] = False + + return management_args @property def session_path(self) -> str: @@ -310,12 +330,24 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | files.append(file_loc) return files - def get_signature_string(signature): + def get_signature_as_string(signature): params = [ param_value for param_name, param_value in signature.parameters.items() if param_name not in ["session"] ] return str(signature.replace(parameters=params))[1:-1].replace(" *,", "") + def get_signature_as_dict(signature_string): + from re import compile as re_compile + from ast import literal_eval + + signature_pattern = re_compile(r" *(?P<key> *\w+) *= *(?P<value>.*?) *(?=(?:(?:, *\w+ *=)|(?:$)))") + patt = signature_pattern.findall(signature_string) + data = {} + for key, value in patt: + t_val = literal_eval(value) + data[key] = t_val + return data + class Handshake(Task): name = f"{app_name}.handshake" @@ -340,10 +372,12 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | for pipe in pipeline.pipes.values(): for step in pipe.steps.values(): if step.complete_name in app.tasks.keys(): - sig = get_signature_string(step.generate.__signature__) + str_sig = get_signature_as_string(step.generate.__signature__) + dict_sig = get_signature_as_dict(str_sig) doc = step.generate.__doc__ task_data = { - "signature": sig, + "signature": str_sig, + "signature_dict": dict_sig, "docstring": doc, "step_name": step.step_name, "pipe_name": step.pipe_name,