diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index 3c6d080f950aebd969842dfad6c7d5f9a5b2d5c0..17aaf53cab0c478e9b3b1a40d003bfdb60e6b7bd 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.62" +__version__ = "0.0.63" from . import loggs from .pipes import * diff --git a/src/pypelines/steps.py b/src/pypelines/steps.py index a37a99b0242de86cea4e2a11b62f12476f5030a0..0fd2bcc6c68565b11b28b7f41ce5ece1e505ea69 100644 --- a/src/pypelines/steps.py +++ b/src/pypelines/steps.py @@ -188,10 +188,9 @@ class BaseStep: """Return the result of calling the get_generate_wrapped method.""" return self.get_generate_wrapped() - # def make_wrapped_functions(self): - # self.save = self.make_wrapped_save() - # self.load = self.make_wrapped_load() - # self.generate = self.make_wrapped_generate() + @property + def run_callbacks(self): + return self.get_run_callbacks() def get_save_wrapped(self): """Returns a wrapped function that saves data using the disk class. @@ -288,6 +287,33 @@ class BaseStep: ) return autoload_arguments(loggedmethod(self.generation_mechanism), self) + def get_run_callbacks(self): + def wrapper(session, extra="", show_plots=True): + logger = logging.getLogger("callback_runner") + for callback_data in self.callbacks: + arguments = {"session": session, "extra": extra, "pipeline": self.pipeline} + if isinstance(callback_data, tuple): + callback = callback_data[0] + overriding_arguments = callback_data[1] + else: + callback = callback_data + overriding_arguments = {} + arguments.update(overriding_arguments) + on_what = f"{session.alias}.{extra}" if extra else session.alias + try: + logger.info(f"Running the callback {callback.__name__} on {on_what}") + callback(**arguments) + except Exception as e: + import traceback + + traceback_msg = traceback.format_exc() + logger.error(f"The callback {callback} failed with error : {e}") + logger.error(f"Full traceback below :\n{traceback_msg}") + + if self.do_dispatch: + return self.pipe.dispatcher(wrapper, "callbacks") + return wrapper + def get_level(self, selfish=False) -> int: """Get the level of the step. @@ -521,7 +547,7 @@ class BaseStep: if save_output: logger.save(f"Saving the generated {self.relative_name}{'.' + extra if extra else ''} output.") disk_object.save(result) - self.run_callbacks(session, extra, show_plots=False) + self.run_callbacks(session, extra=extra, show_plots=False) return result @@ -555,27 +581,6 @@ class BaseStep: return wrapper - def run_callbacks(self, session, extra="", show_plots=True) -> None: - logger = logging.getLogger("callback_runner") - for callback_data in self.callbacks: - arguments = {"session": session, "extra": extra, "pipeline": self.pipeline} - if isinstance(callback_data, tuple): - callback = callback_data[0] - overriding_arguments = callback_data[1] - else: - callback = callback_data - overriding_arguments = {} - arguments.update(overriding_arguments) - try: - logger.info(f"Running the callback {callback.__name__}") - callback(**arguments) - except Exception as e: - import traceback - - traceback_msg = traceback.format_exc() - logger.error(f"The callback {callback} failed with error : {e}") - logger.error("Full traceback below :\n" + traceback_msg) - def generate_doc(self) -> str: """Generate a new docstring by inserting a chapter about Pipeline Args before the existing docstring of the function.