From 972d814062498ede1a5ec8fa8f736490a8e93d7b Mon Sep 17 00:00:00 2001 From: Timothe Jost <timothe.jost@wanadoo.fr> Date: Wed, 27 Mar 2024 23:34:14 +0100 Subject: [PATCH] update to allow using refresh in celery (skip was conflicting) --- src/pypelines/__init__.py | 2 +- src/pypelines/celery_tasks.py | 9 ++++++++- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index fa0af12..0fded8d 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.36" +__version__ = "0.0.37" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index 501fce0..f672251 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -33,6 +33,7 @@ def get_runner(task_name: str): try: session = task.get_session() application = task.get_application() + arguments = task.arguments with LogTask(task) as log_object: logger = log_object.logger @@ -44,7 +45,13 @@ def get_runner(task_name: str): step: "BaseStep" = ( application.pipelines[task.pipeline_name].pipes[task.pipe_name].steps[task.step_name] ) - step.generate(session, extra=extra, skip=True, check_requirements=True, **task.arguments) + if arguments.get("refresh", False) or arguments.get("refresh_requirements", []): + skip = False + else: + skip = True + arguments.pop(skip) + + step.generate(session, extra=extra, skip=skip, check_requirements=True, **task.arguments) task.status_from_logs(log_object) except Exception as e: traceback_msg = format_traceback_exc() -- GitLab