From a4194c49e6bb884ab2d1fa2f5ec26337884957b4 Mon Sep 17 00:00:00 2001 From: Timothe Jost <timothe.jost@wanadoo.fr> Date: Mon, 3 Jun 2024 14:00:35 +0200 Subject: [PATCH] run callback got improved : ability to be dispatched or not --- src/pypelines/__init__.py | 2 +- src/pypelines/steps.py | 57 +++++++++++++++++++++------------------ 2 files changed, 32 insertions(+), 27 deletions(-) diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index 3c6d080..17aaf53 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.62" +__version__ = "0.0.63" from . import loggs from .pipes import * diff --git a/src/pypelines/steps.py b/src/pypelines/steps.py index a37a99b..0fd2bcc 100644 --- a/src/pypelines/steps.py +++ b/src/pypelines/steps.py @@ -188,10 +188,9 @@ class BaseStep: """Return the result of calling the get_generate_wrapped method.""" return self.get_generate_wrapped() - # def make_wrapped_functions(self): - # self.save = self.make_wrapped_save() - # self.load = self.make_wrapped_load() - # self.generate = self.make_wrapped_generate() + @property + def run_callbacks(self): + return self.get_run_callbacks() def get_save_wrapped(self): """Returns a wrapped function that saves data using the disk class. @@ -288,6 +287,33 @@ class BaseStep: ) return autoload_arguments(loggedmethod(self.generation_mechanism), self) + def get_run_callbacks(self): + def wrapper(session, extra="", show_plots=True): + logger = logging.getLogger("callback_runner") + for callback_data in self.callbacks: + arguments = {"session": session, "extra": extra, "pipeline": self.pipeline} + if isinstance(callback_data, tuple): + callback = callback_data[0] + overriding_arguments = callback_data[1] + else: + callback = callback_data + overriding_arguments = {} + arguments.update(overriding_arguments) + on_what = f"{session.alias}.{extra}" if extra else session.alias + try: + logger.info(f"Running the callback {callback.__name__} on {on_what}") + callback(**arguments) + except Exception as e: + import traceback + + traceback_msg = traceback.format_exc() + logger.error(f"The callback {callback} failed with error : {e}") + logger.error(f"Full traceback below :\n{traceback_msg}") + + if self.do_dispatch: + return self.pipe.dispatcher(wrapper, "callbacks") + return wrapper + def get_level(self, selfish=False) -> int: """Get the level of the step. @@ -521,7 +547,7 @@ class BaseStep: if save_output: logger.save(f"Saving the generated {self.relative_name}{'.' + extra if extra else ''} output.") disk_object.save(result) - self.run_callbacks(session, extra, show_plots=False) + self.run_callbacks(session, extra=extra, show_plots=False) return result @@ -555,27 +581,6 @@ class BaseStep: return wrapper - def run_callbacks(self, session, extra="", show_plots=True) -> None: - logger = logging.getLogger("callback_runner") - for callback_data in self.callbacks: - arguments = {"session": session, "extra": extra, "pipeline": self.pipeline} - if isinstance(callback_data, tuple): - callback = callback_data[0] - overriding_arguments = callback_data[1] - else: - callback = callback_data - overriding_arguments = {} - arguments.update(overriding_arguments) - try: - logger.info(f"Running the callback {callback.__name__}") - callback(**arguments) - except Exception as e: - import traceback - - traceback_msg = traceback.format_exc() - logger.error(f"The callback {callback} failed with error : {e}") - logger.error("Full traceback below :\n" + traceback_msg) - def generate_doc(self) -> str: """Generate a new docstring by inserting a chapter about Pipeline Args before the existing docstring of the function. -- GitLab