Skip to content
Snippets Groups Projects
Commit 5da6ca14 authored by Timothe Jost's avatar Timothe Jost
Browse files

better type hinting

parent 487ab746
No related branches found
No related tags found
No related merge requests found
Pipeline #124145 passed
__version__ = "0.0.21"
__version__ = "0.0.22"
from . import loggs
from .pipes import *
......
from typing import Callable, Type, Iterable, Protocol, TYPE_CHECKING
from typing import Callable, Type, Dict, Iterable, Protocol, TYPE_CHECKING
import os
......@@ -10,6 +10,7 @@ if TYPE_CHECKING:
class Pipeline:
use_celery = False
pipes: Dict[str, BasePipe]
def __init__(self, name: str, conf_path=None, use_celery=False):
self.pipeline_name = name
......@@ -20,7 +21,7 @@ class Pipeline:
if use_celery:
self.configure_celery()
def register_pipe(self, pipe_class: "BasePipe") -> "BasePipe":
def register_pipe(self, pipe_class: Type["BasePipe"]) -> Type["BasePipe"]:
"""Wrapper to instanciate and attache a a class inheriting from BasePipe it to the Pipeline instance.
The Wraper returns the class without changing it."""
instance = pipe_class(self)
......@@ -43,7 +44,7 @@ class Pipeline:
self.resolved = False
return pipe_class
def resolve_instance(self, instance_name: str) -> "BaseStep":
def resolve_instance(self, instance_name: str) -> Type["BaseStep"]:
pipe_name, step_name = instance_name.split(".")
try:
pipe = self.pipes[pipe_name]
......
......@@ -8,7 +8,8 @@ import inspect, hashlib
from abc import ABCMeta, abstractmethod
from typing import Callable, Type, Iterable, Protocol, TYPE_CHECKING, Literal
from typing import Callable, Type, Iterable, Protocol, TYPE_CHECKING, Literal, Dict
from types import MethodType
if TYPE_CHECKING:
from .pipelines import Pipeline
......@@ -21,38 +22,40 @@ class BasePipe(metaclass=ABCMeta):
single_step: bool = False # a flag to tell the initializer to bind the unique step of this pipe in place
# of the pipe itself, to the registered pipeline.
step_class: BaseStep = BaseStep
disk_class: BaseDiskObject = BaseDiskObject
multisession_class: BaseMultisessionAccessor = BaseMultisessionAccessor
step_class: Type[BaseStep] = BaseStep
disk_class: Type[BaseDiskObject] = BaseDiskObject
multisession_class: Type[BaseMultisessionAccessor] = BaseMultisessionAccessor
steps: Dict[str, BaseStep]
def __init__(self, parent_pipeline: "Pipeline") -> None:
self.pipeline = parent_pipeline
self.pipe_name = self.__class__.__name__
self.steps = {}
_steps: Dict[str, MethodType] = {}
# this loop populates self.steps dictionnary from the instanciated (bound) step methods.
for step_name, step in inspect.getmembers(self, predicate=inspect.ismethod):
if getattr(step, "is_step", False):
self.steps[step_name] = step
_steps[step_name] = step
if len(self.steps) < 1:
if len(_steps) < 1:
raise ValueError(
f"You should register at least one step class with @stepmethod in {self.pipe_name} class."
f" { self.steps = }"
f" { _steps = }"
)
if len(self.steps) > 1 and self.single_step:
if len(_steps) > 1 and self.single_step:
raise ValueError(
f"Cannot set single_step to True if you registered more than one step inside {self.pipe_name} class."
f" { self.steps = }"
f" { _steps = }"
)
number_of_steps_with_requirements = 0
for step in self.steps.values():
for step in _steps.values():
if len(step.requires):
number_of_steps_with_requirements += 1
if number_of_steps_with_requirements < len(self.steps) - 1:
if number_of_steps_with_requirements < len(_steps) - 1:
raise ValueError(
"Steps of a single pipe must be linked in hierarchical order : Cannot have a single pipe with N steps"
" (N>1) and have no `requires` specification for at least N-1 steps."
......@@ -60,7 +63,8 @@ class BasePipe(metaclass=ABCMeta):
# this loop populates self.steps and replacs the bound methods with usefull Step objects.
# They must inherit from BaseStep
for step_name, step in self.steps.items():
self.steps = {}
for step_name, step in _steps.items():
step = self.step_class(self.pipeline, self, step) # , step_name)
self.steps[step_name] = step # replace the bound_method by a step_class using that bound method,
# so that we attach the necessary components to it.
......
......@@ -53,6 +53,7 @@ def stepmethod(requires=[], version=None, do_dispatch=True, on_save_callbacks=[]
class BaseStep:
def __init__(
self,
pipeline: "Pipeline",
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment