diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index 0fded8de1c6ad4205b863337e8e420922e3ba245..369e013551ba746e18f4305423d304397470cb9c 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.37" +__version__ = "0.0.38" from . import loggs from .pipes import * diff --git a/src/pypelines/celery_tasks.py b/src/pypelines/celery_tasks.py index f6722515e46e074950ed035e6e293fc502eb14c2..b11b4362bdd3f1b5a733a4d62828b26851f9b793 100644 --- a/src/pypelines/celery_tasks.py +++ b/src/pypelines/celery_tasks.py @@ -365,6 +365,37 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | def run(self): return f"{node()} is happy to shake your hand and says hello !" + def get_signature_string(signature): + params = [ + param_value for param_name, param_value in signature.parameters.items() if param_name not in ["session"] + ] + return str(signature.replace(parameters=params))[1:-1].replace(" *,", "") + + class tasks_infos(Task): + name = "tasks_infos" + + def run(self, app_name): + app = APPLICATIONS_STORE[app_name] + tasks_dynamic_data = {} + pipelines = getattr(app, "pipelines", {}) + for pipeline in pipelines.values(): + for pipe in pipeline.pipes.values(): + for step in pipe.steps.values(): + if step.complete_name in app.tasks.keys(): + sig = get_signature_string(step.generate.__signature__) + doc = step.generate.__doc__ + task_data = { + "signature": sig, + "docstring": doc, + "step_name": step.step_name, + "pipe_name": step.pipe_name, + "pipeline_name": step.pipeline_name, + "requires": step.requires, + } + tasks_dynamic_data[step.complete_name] = task_data + return tasks_dynamic_data + app.register_task(handshake) + app.register_task(tasks_infos) return app diff --git a/src/pypelines/steps.py b/src/pypelines/steps.py index bd4b41959e95a1c9252b42d1a003a17a1a62388a..2a753f8794ceda8841cf467c96e973bf8f035bf2 100644 --- a/src/pypelines/steps.py +++ b/src/pypelines/steps.py @@ -430,9 +430,68 @@ class BaseStep: # Replace the wrapper function's signature with the new one wrapper.__signature__ = original_signature.replace(parameters=original_params) + wrapper.__doc__ = self.generate_doc() return wrapper + def generate_doc(self) -> str: + new_doc = "" + doc = self.worker.__doc__ + if doc is None: + return new_doc + lines = doc.split("\n") + lines_count = len(lines) + inserted_chapter = False + new_chapter = """ + Pipeline Args: + skip (bool, optional) : If True and the data can be loaded, it will be skipped instead + (to avoid lengthy load time if one only wants to generate an output for later) + Particularly usefull on a remove celery node where the result does not need to be returned, + for example. + Note that if it is True and that the result cannot be loaded, the generation mechanism + will of course happen, and the result will be saved if save_output is True + (usually, saved to a disk file). + Defaults to False. + refresh (bool, optional) : If True, it forces the generation mechanism to happen, even if a valid disk + file can be found. Note that, because refresh forces the generation whatever the load state and + skip tries to avoid having to load, only if the generation does not occur, calling skip=True + when refresh is also True is pointless. For this reason and to avoid user confusion, + calling with both True at the same time raises and error (with an help message telling you + to set one of them to False) + Defaults to False + refresh_requirements (str, List[str], optional) : If set to a string or list of strings, + the steps that have a pipe_name or relative_name matching one of the strings supplied get refreshed + in the requirement tree check stage. For example, setting refresh_requirements=["trials_df"] + will trigger a refresh on all steps of the pipe trials_df that are encountered during + requirement tree check. For more specificity, setting refresh_requirements=["trials_df.my_step"] + will only refresh the step my_step of the pipe trials_df when encountered in the requirement tree. + You can cumulate multiple refresh conditions by including several strings in this list. + Defaults to empty list. + check_requirements (bool, optional) : If true, the requirement tree check stage will be triggered to + verify that the outputs of the steps required by the current call are already available. + If not, they will be generated and saved, before each stage in the requirement tree is run. + This should prevent errors or crashes due to requirements missing, and is the main desired feature + of the pypelines package. + It is set to False by default to avoid the users running into some issues in case they are starting + to use the package as possible data loss (processed data and not raw data if well used) + is at stake if user defined steps classes are misused / miscoded. + Defaults to False. + save_output (bool, optional) : If True, once the data is obtained throught the generation mechanism + it is saved before being returned (if skip False). + (Data is usually saved to disk, but it depends on the disk_object implementation you selected, + and can be to a database or ram object serving as cache during a session, for example.) + If False, the data is not saved. This might be usefull especially during developpements tests, + if tested with real data, and results are already loadable but you don't want to erase it by setting + refresh = True. + Defaults to True. + """ + for line_no, line in enumerate(lines): + if not inserted_chapter and ("Raises" in line or "Returns" in line or line_no >= lines_count - 1): + new_doc += new_chapter + "\n" + inserted_chapter = True + new_doc += line + "\n" + return new_doc + def get_default_extra(self): """Get default value of a function's parameter""" sig = inspect.signature(self.worker)