From 7b74d49b0a65e8750f65c10383bd95be62a7bc54 Mon Sep 17 00:00:00 2001 From: Timothe Jost <timothe.jost@wanadoo.fr> Date: Fri, 13 Sep 2024 15:21:23 +0200 Subject: [PATCH] adding more tests to support the three pipeline creation strategies and to test requirements processing --- src/pypelines/__init__.py | 2 +- src/pypelines/pipes.py | 13 +++++--- src/pypelines/steps.py | 37 ++++++++++++++++----- src/pypelines/utils.py | 2 +- tests/test_core.py | 68 +++++++++++++++++++++++++++++++++++++-- 5 files changed, 105 insertions(+), 17 deletions(-) diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index 9596bb4..37bc4b2 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.73" +__version__ = "0.0.74" from . import loggs from .pipes import * diff --git a/src/pypelines/pipes.py b/src/pypelines/pipes.py index 785a37a..1f9391d 100644 --- a/src/pypelines/pipes.py +++ b/src/pypelines/pipes.py @@ -37,6 +37,8 @@ class BasePipe(BasePipeType, metaclass=ABCMeta): disk_class: Type[BaseDiskObject] = BaseDiskObject multisession_class: Type[BaseMultisessionAccessor] = BaseMultisessionAccessor + pipe_name: str + steps: Dict[str, BaseStep] def __init__(self, parent_pipeline: "Pipeline") -> None: @@ -70,8 +72,14 @@ class BasePipe(BasePipeType, metaclass=ABCMeta): """ logger = getLogger("Pipe") + self.pipe_name = ( + to_snake_case(self.pipe_name) + if getattr(self, "pipe_name", None) is not None + else to_snake_case(self.__class__.__name__) + ) + self.pipeline = parent_pipeline - self.pipe_name = to_snake_case(self.__class__.__name__) + # pipeline.pipes.pipe will thus work whatever if the object in pipelines.pipes is a step or a pipe self.pipe = self @@ -144,9 +152,6 @@ class BasePipe(BasePipeType, metaclass=ABCMeta): # TODO : eventually scan requirements strings / objects to rebind # TODO : them to the local pipeline correspunding objects - if not hasattr(instanciated_step, "disk_class"): - instanciated_step.disk_class = self.disk_class - if instanciated_step.step_name in self.steps.keys(): raise AttributeError( "Cannot attach two steps of the same name via different methods to a single pipe." diff --git a/src/pypelines/steps.py b/src/pypelines/steps.py index 3aac715..7669f21 100644 --- a/src/pypelines/steps.py +++ b/src/pypelines/steps.py @@ -17,7 +17,7 @@ if TYPE_CHECKING: from .tasks import BaseStepTaskManager -def stepmethod(requires=[], version=None, do_dispatch=True, on_save_callbacks=[]): +def stepmethod(requires=None, version=None, do_dispatch=None, on_save_callbacks=None, disk_class=None, step_name=None): """Wrapper to attach some attributes to a method of a pipeline's pipe. These methods are necessary to trigger the pipeline creation mechanism on that step_method after the pipe has been fully defined. @@ -53,12 +53,20 @@ def stepmethod(requires=[], version=None, do_dispatch=True, on_save_callbacks=[] Callable: The registered function with additional attributes such as 'requires', 'is_step', 'version', 'do_dispatch', 'step_name', and 'callbacks'. """ - function.requires = [requires] if not isinstance(requires, list) else requires function.is_step = True - function.version = version - function.do_dispatch = do_dispatch - function.step_name = to_snake_case(function.__name__) - function.callbacks = [on_save_callbacks] if not isinstance(on_save_callbacks, list) else on_save_callbacks + + if requires is not None: + function.requires = requires + if version is not None: + function.version = version + if do_dispatch is not None: + function.do_dispatch = do_dispatch + if step_name is not None: + function.step_name = step_name + if on_save_callbacks is not None: + function.callbacks = on_save_callbacks + if disk_class is not None: + function.disk_class = disk_class return function return registrate @@ -68,7 +76,7 @@ class BaseStep: step_name: str - requires: List["BaseStep"] + requires: List["BaseStep"] | List[str] | str version: str | int do_dispatch: bool callbacks: List[Callable] @@ -112,11 +120,24 @@ class BaseStep: # we attach the values of the worker elements to the Step # as they are get only (no setter) on worker if it is not None (bound method) - self.do_dispatch = self.get_attribute_or_default("do_dispatch", False) + self.do_dispatch = self.get_attribute_or_default("do_dispatch", True) + self.version = self.get_attribute_or_default("version", 0) + self.requires = self.get_attribute_or_default("requires", []) + self.requires = [self.requires] if not isinstance(self.requires, list) else self.requires + + self.disk_class = self.get_attribute_or_default("disk_class", getattr(self.pipe, "disk_class")) + if self.disk_class is None: + raise AttributeError( + f"disk_class of step {self.step_name} should be : \n" + " - defined through decorator\n" + " - defined with the disk_class attribute of the Step\n" + " - defined with the disk_class attribute of the Pipe class that the Step is bound to\n" + ) self.callbacks = self.get_attribute_or_default("callbacks", []) + self.callbacks = [self.callbacks] if not isinstance(self.callbacks, list) else self.callbacks # self.make_wrapped_functions() diff --git a/src/pypelines/utils.py b/src/pypelines/utils.py index e2820c1..f434f32 100644 --- a/src/pypelines/utils.py +++ b/src/pypelines/utils.py @@ -1,7 +1,7 @@ import re -def to_snake_case(text): +def to_snake_case(text: str): # Replace spaces or hyphens with underscores text = re.sub(r"[\s-]+", "_", text) # Convert CamelCase to snake_case diff --git a/tests/test_core.py b/tests/test_core.py index cf7eb1a..b203022 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -10,8 +10,8 @@ import pytest from pypelines import examples from pypelines.sessions import Session -from pypelines import Pipeline, stepmethod, BaseStep -from pypelines.pickle_backend import PicklePipe +from pypelines import Pipeline, stepmethod, BaseStep, BasePipe +from pypelines.pickle_backend import PicklePipe, PickleDiskObject from pathlib import Path @@ -28,6 +28,18 @@ def pipeline_method_based(): def my_step(self, session, extra=""): return "a_good_result" + @test_pipeline.register_pipe + class complex_pipe(BasePipe): + + @stepmethod(disk_class=PickleDiskObject) + def my_step_name(self, session, extra=""): + return 154 + + @stepmethod(requires="complex_pipe.my_step_name", disk_class=PickleDiskObject) + def another_name(self, session, extra=""): + data = self.load_requirement("complex_pipe", session, extra=extra) + return data - 100 + return test_pipeline @@ -44,6 +56,21 @@ def pipeline_steps_group_class_based(): my_step.requires = [] + @test_pipeline.register_pipe + class complex_pipe(BasePipe): + + class Steps: + @stepmethod(disk_class=PickleDiskObject) + def my_step_name(self, session, extra=""): + return 154 + + def another_name(self, session, extra=""): + data = self.load_requirement("complex_pipe", session, extra=extra) + return data - 100 + + another_name.requires = "complex_pipe.my_step_name" + another_name.disk_class = PickleDiskObject + return test_pipeline @@ -59,6 +86,27 @@ def pipeline_class_based(): def worker(self, session, extra=""): return "a_good_result" + @test_pipeline.register_pipe + class MyComplexPipe(BasePipe): + + pipe_name = "complex_pipe" + + class MyStep(BaseStep): + def worker(self, session, extra=""): + return 154 + + disk_class = PickleDiskObject + step_name = "my_step_name" + + class MyStep2(BaseStep): + def worker(self, session, extra=""): + data = self.load_requirement("complex_pipe", session, extra=extra) + return data - 100 + + step_name = "another_name" + requires = "complex_pipe.my_step_name" + disk_class = PickleDiskObject + return test_pipeline @@ -105,7 +153,7 @@ def test_pypeline_creation(request, pipeline_fixture_name): @pytest.mark.parametrize("pipeline_fixture_name", get_pipelines_fixtures()) def test_pypeline_call(request, pipeline_fixture_name: str, session): - pipeline = request.getfixturevalue(pipeline_fixture_name) + pipeline: Pipeline = request.getfixturevalue(pipeline_fixture_name) # expecting the output to not be present if the pipeline step was not generated first with pytest.raises(ValueError): @@ -123,3 +171,17 @@ def test_pypeline_call(request, pipeline_fixture_name: str, session): # expecting the output to be present now assert pipeline.my_pipe.my_step.load(session) == "a_good_result" + + +@pytest.mark.parametrize("pipeline_fixture_name", get_pipelines_fixtures()) +def test_pypeline_requirement_stack(request, pipeline_fixture_name: str, session): + pipeline: Pipeline = request.getfixturevalue(pipeline_fixture_name) + + # before being resolved (called) + assert pipeline.complex_pipe.another_name.requires == ["complex_pipe.my_step_name"] + + # expect no result is present on disk because we didn't check_requirements + with pytest.raises(ValueError): + pipeline.complex_pipe.another_name.generate(session) + + assert pipeline.complex_pipe.another_name.generate(session, check_requirements=True) == 54 -- GitLab