Skip to content
Snippets Groups Projects
Commit c6895835 authored by Timothe Jost's avatar Timothe Jost
Browse files

ability to change inclusion policy for start and stop

parent e75f31b6
No related branches found
No related tags found
No related merge requests found
Pipeline #131718 passed
__version__ = "0.0.60"
__version__ = "0.0.61"
from . import loggs
from .pipes import *
......
......@@ -715,7 +715,10 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
Returns:
dict: A dictionary containing information about remote tasks, including workers and task names.
"""
registered_tasks = self.control.inspect().registered_tasks()
try:
registered_tasks = self.control.inspect().registered_tasks()
except ConnectionResetError:
return None
workers = []
task_names = []
if registered_tasks:
......
......@@ -300,6 +300,10 @@ class BaseStep:
self.pipeline.resolve()
return StepLevel(self).resolve_level(selfish=selfish)
def is_required(self):
# TODO implement this (False if the step is not present in any other step' requirement stack, else True)
raise NotImplementedError
def get_disk_object(self, session, extra=None):
"""Return a disk object based on the provided session and optional extra parameters.
......@@ -517,29 +521,7 @@ class BaseStep:
if save_output:
logger.save(f"Saving the generated {self.relative_name}{'.' + extra if extra else ''} output.")
disk_object.save(result)
# AFTER the saving has been done, if there is some callback function that should be run, we execute them
# If an exception is thrown in a callback, the whole pipeline will stop, intentionnaly.
# TODO an option could be added to catch, display and store exceptions tracebacks,
# while allowing the pipeline to continue,
# in case the callbacks are not absolutely necessary for the pipeline process. (ex, generate plots)
for callback_data in self.callbacks:
arguments = {"session": session, "extra": extra, "pipeline": self.pipeline}
if isinstance(callback_data, tuple):
callback = callback_data[0]
overriding_arguments = callback_data[1]
else:
callback = callback_data
overriding_arguments = {}
arguments.update(overriding_arguments)
try:
callback(**arguments)
except Exception as e:
import traceback
traceback_msg = traceback.format_exc()
logger.error(f"The callback {callback} failed with error : {e}")
logger.error("Full traceback below :\n" + traceback_msg)
self.run_callbacks(session, extra, show_plots=False)
return result
......@@ -573,6 +555,27 @@ class BaseStep:
return wrapper
def run_callbacks(self, session, extra="", show_plots=True) -> None:
logger = logging.getLogger("callback_runner")
for callback_data in self.callbacks:
arguments = {"session": session, "extra": extra, "pipeline": self.pipeline}
if isinstance(callback_data, tuple):
callback = callback_data[0]
overriding_arguments = callback_data[1]
else:
callback = callback_data
overriding_arguments = {}
arguments.update(overriding_arguments)
try:
logger.info(f"Running the callback {callback.__name__}")
callback(**arguments)
except Exception as e:
import traceback
traceback_msg = traceback.format_exc()
logger.error(f"The callback {callback} failed with error : {e}")
logger.error("Full traceback below :\n" + traceback_msg)
def generate_doc(self) -> str:
"""Generate a new docstring by inserting a chapter about Pipeline Args before the existing
docstring of the function.
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment