diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index 4fd42c1fc9c76df4528b4372bb4ba199baa153d3..5a803c999e1c08fbc14fccfaa803a8e02ac723fb 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.60" +__version__ = "0.0.61" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index 39fc47fc66d6889a679d22512f2ba23c9d16ae92..f4d771a030ca217a5a488ab9b768de8b478ded8f 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -715,7 +715,10 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | Returns: dict: A dictionary containing information about remote tasks, including workers and task names. """ - registered_tasks = self.control.inspect().registered_tasks() + try: + registered_tasks = self.control.inspect().registered_tasks() + except ConnectionResetError: + return None workers = [] task_names = [] if registered_tasks: diff --git a/src/pypelines/steps.py b/src/pypelines/steps.py index c9a6ebfe47e2c180df3d975e93c1d4fec6710682..a37a99b0242de86cea4e2a11b62f12476f5030a0 100644 --- a/src/pypelines/steps.py +++ b/src/pypelines/steps.py @@ -300,6 +300,10 @@ class BaseStep: self.pipeline.resolve() return StepLevel(self).resolve_level(selfish=selfish) + def is_required(self): + # TODO implement this (False if the step is not present in any other step' requirement stack, else True) + raise NotImplementedError + def get_disk_object(self, session, extra=None): """Return a disk object based on the provided session and optional extra parameters. @@ -517,29 +521,7 @@ class BaseStep: if save_output: logger.save(f"Saving the generated {self.relative_name}{'.' + extra if extra else ''} output.") disk_object.save(result) - - # AFTER the saving has been done, if there is some callback function that should be run, we execute them - # If an exception is thrown in a callback, the whole pipeline will stop, intentionnaly. - # TODO an option could be added to catch, display and store exceptions tracebacks, - # while allowing the pipeline to continue, - # in case the callbacks are not absolutely necessary for the pipeline process. (ex, generate plots) - for callback_data in self.callbacks: - arguments = {"session": session, "extra": extra, "pipeline": self.pipeline} - if isinstance(callback_data, tuple): - callback = callback_data[0] - overriding_arguments = callback_data[1] - else: - callback = callback_data - overriding_arguments = {} - arguments.update(overriding_arguments) - try: - callback(**arguments) - except Exception as e: - import traceback - - traceback_msg = traceback.format_exc() - logger.error(f"The callback {callback} failed with error : {e}") - logger.error("Full traceback below :\n" + traceback_msg) + self.run_callbacks(session, extra, show_plots=False) return result @@ -573,6 +555,27 @@ class BaseStep: return wrapper + def run_callbacks(self, session, extra="", show_plots=True) -> None: + logger = logging.getLogger("callback_runner") + for callback_data in self.callbacks: + arguments = {"session": session, "extra": extra, "pipeline": self.pipeline} + if isinstance(callback_data, tuple): + callback = callback_data[0] + overriding_arguments = callback_data[1] + else: + callback = callback_data + overriding_arguments = {} + arguments.update(overriding_arguments) + try: + logger.info(f"Running the callback {callback.__name__}") + callback(**arguments) + except Exception as e: + import traceback + + traceback_msg = traceback.format_exc() + logger.error(f"The callback {callback} failed with error : {e}") + logger.error("Full traceback below :\n" + traceback_msg) + def generate_doc(self) -> str: """Generate a new docstring by inserting a chapter about Pipeline Args before the existing docstring of the function.