diff --git a/src/pypelines/__init__.py b/src/pypelines/__init__.py index bec4fca0bd04549887208bd10b4502e93fe84525..9596bb45fa6164cc8b262b59b79936c60f221d0c 100644 --- a/src/pypelines/__init__.py +++ b/src/pypelines/__init__.py @@ -1,4 +1,4 @@ -__version__ = "0.0.72" +__version__ = "0.0.73" from . import loggs from .pipes import * diff --git a/src/pypelines/pipes.py b/src/pypelines/pipes.py index 86f8731a21cae0c4f913dee21604dc53ca0191b4..785a37a40103e2066c1a6214ebd148db335e6c79 100644 --- a/src/pypelines/pipes.py +++ b/src/pypelines/pipes.py @@ -10,10 +10,12 @@ import inspect, hashlib from pandas import DataFrame from abc import ABCMeta, abstractmethod +from copy import deepcopy -from typing import Callable, Type, Iterable, Protocol, TYPE_CHECKING, Literal, Dict +from typing import Callable, Type, Iterable, Protocol, TYPE_CHECKING, Literal, Dict, Set from types import MethodType +from logging import getLogger if TYPE_CHECKING: from .pipelines import Pipeline @@ -66,64 +68,95 @@ class BasePipe(BasePipeType, metaclass=ABCMeta): Returns: None """ + logger = getLogger("Pipe") + self.pipeline = parent_pipeline self.pipe_name = to_snake_case(self.__class__.__name__) + # pipeline.pipes.pipe will thus work whatever if the object in pipelines.pipes is a step or a pipe + self.pipe = self - _steps: Dict[str, MethodType] = {} - - steps_members_scanner = inspect.getmembers(self, predicate=inspect.ismethod) - requires_is_step_attr = True - - for class_name, class_object in inspect.getmembers(self, predicate=inspect.isclass): - if class_name == "Steps": - steps_members_scanner = inspect.getmembers(class_object(), predicate=inspect.ismethod) - requires_is_step_attr = False + step_methods: Set[MethodType] = set() + + # first strategy, scans decorated methods that belongs to the pipe, and have an is_step attribute + # (decorated, directly in the pipe class) + for attribute_name, method in inspect.getmembers(self, predicate=inspect.ismethod): + if getattr(method, "is_step", False): + step_methods.add(method) + + # second strategy, scans the Steps pipe class, if existing, and get all of it's methods to make steps. + # (decorated or not, and grouped under Steps class, to differentiate them from non Step methods of the pipe) + for attribute_name, class_object in inspect.getmembers(self, predicate=inspect.isclass): + if attribute_name == "Steps": + for sub_attribute_name, method in inspect.getmembers(class_object(), predicate=inspect.ismethod): + # if getattr(method, "is_step", False): + step_methods.add(method) break - # this loop populates self.steps dictionnary from the instanciated (bound) step methods. - for step_name, step in steps_members_scanner: - if not requires_is_step_attr or getattr(step, "is_step", False): - step_name = to_snake_case(step_name) - _steps[step_name] = step - - for step_name, step in inspect.getmembers(self, predicate=inspect.isclass): - if - - if len(_steps) < 1: + # third strategy, by checking BaseStep inheriting classes defined in the Pipe + steps_classes: Set[Type[BaseStep]] = set() + for attribute_name, class_object in inspect.getmembers(self, predicate=inspect.isclass): + # The attribute Pipe.step_class is a child of BasePipe but we don't want to instantiate it as a step. + # It is meant to be used as a contructor class for making steps out of methods, + # #with first or second strategy + if attribute_name == "step_class": + continue + if issubclass(class_object, BaseStep): + steps_classes.add(class_object) + + if len(steps_classes) + len(step_methods) < 1: raise ValueError( - f"You should register at least one step class with @stepmethod in {self.pipe_name} class. {_steps=}" + f"You should register at least one step in the pipe:{self.pipe_name} " + f"of the pipeline {self.pipeline.pipeline_name}. " + f"{step_methods=}, {steps_classes=}." ) - # if len(_steps) > 1 and self.single_step: - # raise ValueError( - # f"Cannot set single_step to True if you registered more than one step inside {self.pipe_name} class." - # f" { _steps = }" - # ) + self.steps = {} + # We instanciate decorated methods steps using the step_class defined in the pipe + for step_worker_method in step_methods: + logger.debug(f"defining {step_worker_method} from method in {self}") + instanciated_step = self.step_class(pipeline=self.pipeline, pipe=self, worker=step_worker_method) + self.attach_step(instanciated_step) + + # We instanciate class based steps using their class directly + for step_class in steps_classes: + logger.debug(f"defining {step_class} from class in {self}") + instanciated_step = step_class(pipeline=self.pipeline, pipe=self) + self.attach_step(instanciated_step) + + self.verify_hierarchical_requirements() - number_of_steps_with_requirements = 0 - for step in _steps.values(): - if len(getattr(step, "requires", [])): - number_of_steps_with_requirements += 1 + def verify_hierarchical_requirements(self): - if number_of_steps_with_requirements < len(_steps) - 1: + number_of_steps_with_requirements = len([True for step in self.steps.values() if len(step.requires) != 0]) + + if number_of_steps_with_requirements < len(self.steps) - 1: raise ValueError( "Steps of a single pipe must be linked in hierarchical order : Cannot have a single pipe with N steps" " (N>1) and have no `requires` specification for at least N-1 steps." ) - # this loop populates self.steps and replacs the bound methods with usefull Step objects. - # They must inherit from BaseStep - self.steps = {} - for step_name, step in _steps.items(): - step = self.step_class(self.pipeline, self, step, step_name=step_name) # , step_name) - self.steps[step_name] = step # replace the bound_method by a step_class using that bound method, - # so that we attach the necessary components to it. - setattr(self, step_name, step) - - # below is just a syntaxic sugar to help in case the pipe is "single_step" - # so that we can access any pipe instance in pipeline with simple iteration on - # pipeline.pipes.pipe, whatever if the object in pipelines.pipes is a step or a pipe - self.pipe = self + def attach_step(self, instanciated_step: "BaseStep", rebind=False): + + if rebind: + instanciated_step = deepcopy(instanciated_step) + instanciated_step.pipeline = self.pipeline + instanciated_step.pipe = self.pipe + # TODO : eventually scan requirements strings / objects to rebind + # TODO : them to the local pipeline correspunding objects + + if not hasattr(instanciated_step, "disk_class"): + instanciated_step.disk_class = self.disk_class + + if instanciated_step.step_name in self.steps.keys(): + raise AttributeError( + "Cannot attach two steps of the same name via different methods to a single pipe." + f" The step named {instanciated_step.step_name} is attached through two " + "mechanisms or has two methods with conflicting names." + ) + + self.steps[instanciated_step.step_name] = instanciated_step + setattr(self, instanciated_step.step_name, instanciated_step) + self.pipeline.resolved = False @property def version(self): diff --git a/src/pypelines/steps.py b/src/pypelines/steps.py index 1093e8f40b5d55b8f19ad98f142c5cd5db64ecf9..3aac7156c4c8f02ef5677dc2ddf30e16a9084d5f 100644 --- a/src/pypelines/steps.py +++ b/src/pypelines/steps.py @@ -73,14 +73,14 @@ class BaseStep: do_dispatch: bool callbacks: List[Callable] + disk_class: "Type[BaseDiskObject]" + task: "BaseStepTaskManager" worker: Callable pipe: "BasePipe" pipeline: "Pipeline" - def __init__( - self, pipeline: "Pipeline", pipe: "BasePipe", worker: Optional[MethodType] = None, step_name: str = "" - ): + def __init__(self, pipeline: "Pipeline", pipe: "BasePipe", worker: Optional[MethodType] = None): """Initialize a BaseStep object. Args: @@ -105,50 +105,58 @@ class BaseStep: # save an instanciated access to the pipe parent self.pipe = pipe - self.step_name = to_snake_case(self.get_attribute_or_default("step_name", step_name)) - - if not self.step_name: - raise ValueError(f"Step name in {self.pipe.pipe_name} cannot be blank nor None") + self.step_name = self.get_step_name(worker) # save an instanciated access to the step function (undecorated) - if not hasattr(self, "worker"): - if worker is None: - raise AttributeError( - f"For the step : {self.pipe.pipe_name}.{self.step_name}, a worker method must " - "be defined if created from a class" - ) - needs_attachment = True - self.worker = worker - else: - needs_attachment = False - - # we attach the values of the worker elements to BaseStep - # as they are get only (no setter) on worker (bound method) - - getattr(self, "do_dispatch", getattr(self.worker, "do_dispatch", False)) + self.find_and_bind_worker(worker) + # we attach the values of the worker elements to the Step + # as they are get only (no setter) on worker if it is not None (bound method) self.do_dispatch = self.get_attribute_or_default("do_dispatch", False) self.version = self.get_attribute_or_default("version", 0) self.requires = self.get_attribute_or_default("requires", []) self.callbacks = self.get_attribute_or_default("callbacks", []) - if needs_attachment: - self.worker = MethodType(worker.__func__, self) - # self.make_wrapped_functions() - update_wrapper(self, self.worker) - # update_wrapper(self.generate, self.worker) - self.multisession = self.pipe.multisession_class(self) self.task = self.pipeline.runner_backend.create_task_manager(self) def get_attribute_or_default(self, attribute_name: str, default: Any) -> Any: - # TODO : fix here , when calling get_attribute_or_default before worker s set, cannot work return getattr(self, attribute_name, getattr(self.worker, attribute_name, default)) + def find_and_bind_worker(self, worker_object: Optional[MethodType]): + if not hasattr(self, "worker"): + if worker_object is None: + AttributeError(self.worker_unfindable_message) + else: + self.worker = MethodType(worker_object.__func__, self) + update_wrapper(self, self.worker) + # else worker is already bound + + @property + def worker_unfindable_message(self): + return ( + f"For the step : {self.pipe.pipe_name}.{getattr(self, 'step_name', '<unknown>')}, a worker method must " + "be defined if created from a class" + ) + + def get_step_name(self, worker_object: Optional[MethodType]): + if not hasattr(self, "worker"): + if worker_object is None: + raise AttributeError(self.worker_unfindable_message) + step_name = getattr(self, "step_name", getattr(worker_object, "step_name", worker_object.__name__)) + else: + step_name = self.get_attribute_or_default("step_name", self.__class__.__name__) + step_name = to_snake_case(step_name) + + if not step_name: + raise ValueError(f'Step name in {self.pipe.pipe_name} cannot be an empty string "" or None') + + return step_name + @property def requirement_stack(self) -> Callable: """Return a partial function that calls the get_requirement_stack method of the pipeline @@ -226,7 +234,7 @@ class BaseStep: The wrapped function that saves the data using the disk class. """ - @wraps(self.pipe.disk_class.save) + @wraps(self.disk_class.save) def wrapper(session, data, extra=None): """Wrapper function to save data to disk. @@ -262,7 +270,7 @@ class BaseStep: The wrapped function for loading disk objects. """ - @wraps(self.pipe.disk_class.load) + @wraps(self.disk_class.load) def wrapper(session, extra=None, strict=False) -> Any: """Wrapper function to load disk object with session and optional extra parameters. @@ -366,7 +374,7 @@ class BaseStep: """ if extra is None: extra = self.get_default_extra() - return self.pipe.disk_class(session, self, extra) + return self.disk_class(session, self, extra) @property def generation_mechanism(self): diff --git a/tests/test_core.py b/tests/test_core.py index 572cd4f33879e6dffec671468a48cdaae1d18415..cf7eb1a64b39e078e20ee812ac402fedb303d786 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -16,10 +16,25 @@ from pypelines.pickle_backend import PicklePipe from pathlib import Path +@pytest.fixture +def pipeline_method_based(): + + test_pipeline = Pipeline("test_method_based") + + @test_pipeline.register_pipe + class MyPipe(PicklePipe): + + @stepmethod(requires=[]) + def my_step(self, session, extra=""): + return "a_good_result" + + return test_pipeline + + @pytest.fixture def pipeline_steps_group_class_based(): - test_pipeline = Pipeline("test_class_based") + test_pipeline = Pipeline("test_group_based") @test_pipeline.register_pipe class MyPipe(PicklePipe): @@ -33,22 +48,22 @@ def pipeline_steps_group_class_based(): @pytest.fixture -def pipeline_method_based(): +def pipeline_class_based(): - test_pipeline = Pipeline("test_method_based") + test_pipeline = Pipeline("test_class_based") @test_pipeline.register_pipe class MyPipe(PicklePipe): - @stepmethod(requires=[]) - def my_step(self, session, extra=""): - return "a_good_result" + class MyStep(BaseStep): + def worker(self, session, extra=""): + return "a_good_result" return test_pipeline def get_pipelines_fixtures(): - return ["pipeline_method_based", "pipeline_steps_group_class_based"] + return ["pipeline_method_based", "pipeline_steps_group_class_based", "pipeline_class_based"] @pytest.fixture @@ -85,6 +100,7 @@ def test_pypeline_creation(request, pipeline_fixture_name): assert hasattr(pipeline.my_pipe.my_step, "generate") assert hasattr(pipeline.my_pipe.my_step, "load") assert hasattr(pipeline.my_pipe.my_step, "save") + assert len(pipeline.my_pipe.steps) == 1 @pytest.mark.parametrize("pipeline_fixture_name", get_pipelines_fixtures())