From c6895835a895343a44fc10182d8e6110cb259b49 Mon Sep 17 00:00:00 2001
From: Timothe Jost <timothe.jost@wanadoo.fr>
Date: Thu, 30 May 2024 18:55:48 +0200
Subject: [PATCH] ability to change inclusion policy for start and stop

---
 src/pypelines/__init__.py     |  2 +-
 src/pypelines/celery_tasks.py |  5 +++-
 src/pypelines/steps.py        | 49 +++++++++++++++++++----------------
 3 files changed, 31 insertions(+), 25 deletions(-)

diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py
index 4fd42c1..5a803c9 100644
--- a/src/pypelines/__init__.py
+++ b/src/pypelines/__init__.py
@@ -1,4 +1,4 @@
-__version__ = "0.0.60"
+__version__ = "0.0.61"
 
 from . import loggs
 from .pipes import *
diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py
index 39fc47f..f4d771a 100644
--- a/src/pypelines/celery_tasks.py
+++ b/src/pypelines/celery_tasks.py
@@ -715,7 +715,10 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
         Returns:
             dict: A dictionary containing information about remote tasks, including workers and task names.
         """
-        registered_tasks = self.control.inspect().registered_tasks()
+        try:
+            registered_tasks = self.control.inspect().registered_tasks()
+        except ConnectionResetError:
+            return None
         workers = []
         task_names = []
         if registered_tasks:
diff --git a/src/pypelines/steps.py b/src/pypelines/steps.py
index c9a6ebf..a37a99b 100644
--- a/src/pypelines/steps.py
+++ b/src/pypelines/steps.py
@@ -300,6 +300,10 @@ class BaseStep:
         self.pipeline.resolve()
         return StepLevel(self).resolve_level(selfish=selfish)
 
+    def is_required(self):
+        # TODO implement this (False if the step is not present in any other step' requirement stack, else True)
+        raise NotImplementedError
+
     def get_disk_object(self, session, extra=None):
         """Return a disk object based on the provided session and optional extra parameters.
 
@@ -517,29 +521,7 @@ class BaseStep:
             if save_output:
                 logger.save(f"Saving the generated {self.relative_name}{'.' + extra if extra else ''} output.")
                 disk_object.save(result)
-
-                # AFTER the saving has been done, if there is some callback function that should be run, we execute them
-                # If an exception is thrown in a callback, the whole pipeline will stop, intentionnaly.
-                # TODO an option could be added to catch, display and store exceptions tracebacks,
-                # while allowing the pipeline to continue,
-                # in case the callbacks are not absolutely necessary for the pipeline process. (ex, generate plots)
-                for callback_data in self.callbacks:
-                    arguments = {"session": session, "extra": extra, "pipeline": self.pipeline}
-                    if isinstance(callback_data, tuple):
-                        callback = callback_data[0]
-                        overriding_arguments = callback_data[1]
-                    else:
-                        callback = callback_data
-                        overriding_arguments = {}
-                    arguments.update(overriding_arguments)
-                    try:
-                        callback(**arguments)
-                    except Exception as e:
-                        import traceback
-
-                        traceback_msg = traceback.format_exc()
-                        logger.error(f"The callback {callback} failed with error : {e}")
-                        logger.error("Full traceback below :\n" + traceback_msg)
+                self.run_callbacks(session, extra, show_plots=False)
 
             return result
 
@@ -573,6 +555,27 @@ class BaseStep:
 
         return wrapper
 
+    def run_callbacks(self, session, extra="", show_plots=True) -> None:
+        logger = logging.getLogger("callback_runner")
+        for callback_data in self.callbacks:
+            arguments = {"session": session, "extra": extra, "pipeline": self.pipeline}
+            if isinstance(callback_data, tuple):
+                callback = callback_data[0]
+                overriding_arguments = callback_data[1]
+            else:
+                callback = callback_data
+                overriding_arguments = {}
+            arguments.update(overriding_arguments)
+            try:
+                logger.info(f"Running the callback {callback.__name__}")
+                callback(**arguments)
+            except Exception as e:
+                import traceback
+
+                traceback_msg = traceback.format_exc()
+                logger.error(f"The callback {callback} failed with error : {e}")
+                logger.error("Full traceback below :\n" + traceback_msg)
+
     def generate_doc(self) -> str:
         """Generate a new docstring by inserting a chapter about Pipeline Args before the existing
         docstring of the function.
-- 
GitLab