Skip to content
Snippets Groups Projects
Commit d6b328c8 authored by Timothe Jost's avatar Timothe Jost
Browse files

adding tasks_infos method to celery apps to describe data

parent 972d8140
No related branches found
No related tags found
No related merge requests found
Pipeline #127190 passed
__version__ = "0.0.37" __version__ = "0.0.38"
from . import loggs from . import loggs
from .pipes import * from .pipes import *
......
...@@ -365,6 +365,37 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery | ...@@ -365,6 +365,37 @@ def create_celery_app(conf_path, app_name="pypelines", v_host=None) -> "Celery |
def run(self): def run(self):
return f"{node()} is happy to shake your hand and says hello !" return f"{node()} is happy to shake your hand and says hello !"
def get_signature_string(signature):
params = [
param_value for param_name, param_value in signature.parameters.items() if param_name not in ["session"]
]
return str(signature.replace(parameters=params))[1:-1].replace(" *,", "")
class tasks_infos(Task):
name = "tasks_infos"
def run(self, app_name):
app = APPLICATIONS_STORE[app_name]
tasks_dynamic_data = {}
pipelines = getattr(app, "pipelines", {})
for pipeline in pipelines.values():
for pipe in pipeline.pipes.values():
for step in pipe.steps.values():
if step.complete_name in app.tasks.keys():
sig = get_signature_string(step.generate.__signature__)
doc = step.generate.__doc__
task_data = {
"signature": sig,
"docstring": doc,
"step_name": step.step_name,
"pipe_name": step.pipe_name,
"pipeline_name": step.pipeline_name,
"requires": step.requires,
}
tasks_dynamic_data[step.complete_name] = task_data
return tasks_dynamic_data
app.register_task(handshake) app.register_task(handshake)
app.register_task(tasks_infos)
return app return app
...@@ -430,9 +430,68 @@ class BaseStep: ...@@ -430,9 +430,68 @@ class BaseStep:
# Replace the wrapper function's signature with the new one # Replace the wrapper function's signature with the new one
wrapper.__signature__ = original_signature.replace(parameters=original_params) wrapper.__signature__ = original_signature.replace(parameters=original_params)
wrapper.__doc__ = self.generate_doc()
return wrapper return wrapper
def generate_doc(self) -> str:
new_doc = ""
doc = self.worker.__doc__
if doc is None:
return new_doc
lines = doc.split("\n")
lines_count = len(lines)
inserted_chapter = False
new_chapter = """
Pipeline Args:
skip (bool, optional) : If True and the data can be loaded, it will be skipped instead
(to avoid lengthy load time if one only wants to generate an output for later)
Particularly usefull on a remove celery node where the result does not need to be returned,
for example.
Note that if it is True and that the result cannot be loaded, the generation mechanism
will of course happen, and the result will be saved if save_output is True
(usually, saved to a disk file).
Defaults to False.
refresh (bool, optional) : If True, it forces the generation mechanism to happen, even if a valid disk
file can be found. Note that, because refresh forces the generation whatever the load state and
skip tries to avoid having to load, only if the generation does not occur, calling skip=True
when refresh is also True is pointless. For this reason and to avoid user confusion,
calling with both True at the same time raises and error (with an help message telling you
to set one of them to False)
Defaults to False
refresh_requirements (str, List[str], optional) : If set to a string or list of strings,
the steps that have a pipe_name or relative_name matching one of the strings supplied get refreshed
in the requirement tree check stage. For example, setting refresh_requirements=["trials_df"]
will trigger a refresh on all steps of the pipe trials_df that are encountered during
requirement tree check. For more specificity, setting refresh_requirements=["trials_df.my_step"]
will only refresh the step my_step of the pipe trials_df when encountered in the requirement tree.
You can cumulate multiple refresh conditions by including several strings in this list.
Defaults to empty list.
check_requirements (bool, optional) : If true, the requirement tree check stage will be triggered to
verify that the outputs of the steps required by the current call are already available.
If not, they will be generated and saved, before each stage in the requirement tree is run.
This should prevent errors or crashes due to requirements missing, and is the main desired feature
of the pypelines package.
It is set to False by default to avoid the users running into some issues in case they are starting
to use the package as possible data loss (processed data and not raw data if well used)
is at stake if user defined steps classes are misused / miscoded.
Defaults to False.
save_output (bool, optional) : If True, once the data is obtained throught the generation mechanism
it is saved before being returned (if skip False).
(Data is usually saved to disk, but it depends on the disk_object implementation you selected,
and can be to a database or ram object serving as cache during a session, for example.)
If False, the data is not saved. This might be usefull especially during developpements tests,
if tested with real data, and results are already loadable but you don't want to erase it by setting
refresh = True.
Defaults to True.
"""
for line_no, line in enumerate(lines):
if not inserted_chapter and ("Raises" in line or "Returns" in line or line_no >= lines_count - 1):
new_doc += new_chapter + "\n"
inserted_chapter = True
new_doc += line + "\n"
return new_doc
def get_default_extra(self): def get_default_extra(self):
"""Get default value of a function's parameter""" """Get default value of a function's parameter"""
sig = inspect.signature(self.worker) sig = inspect.signature(self.worker)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment