Skip to content
Snippets Groups Projects
Commit c733c538 authored by Timothe Jost's avatar Timothe Jost
Browse files

testing to add a better type hinting support means to create a pypeline

parent 6b366d16
No related branches found
No related tags found
No related merge requests found
__version__ = "0.0.66"
__version__ = "0.0.67"
from . import loggs
from .pipes import *
......
......@@ -10,7 +10,11 @@ if TYPE_CHECKING:
from .graphs import PipelineGraph
class Pipeline:
class BasePipelineType(Protocol):
def __getattr__(self, name: str) -> "BasePipe": ...
class Pipeline(BasePipelineType):
pipes: Dict[str, "BasePipe"]
runner_backend_class = BaseTaskBackend
......@@ -61,7 +65,8 @@ class Pipeline:
return pipe_class
def resolve_instance(self, instance_name: str) -> "BaseStep":
"""Resolve the specified instance name to a BaseStep object.
"""Resolve the specified step instance name to a BaseStep object,
looking at the pipe and step names separated by a comma.
Args:
instance_name (str): The name of the instance in the format 'pipe_name.step_name'.
......
......@@ -68,9 +68,22 @@ class BasePipe(BasePipeType, metaclass=ABCMeta):
self.pipe_name = self.__class__.__name__
_steps: Dict[str, MethodType] = {}
steps_members_scanner = inspect.getmembers(self, predicate=inspect.ismethod)
requires_is_step_attr = True
for class_name, class_object in inspect.getmembers(self, predicate=inspect.isclass):
print(class_name, class_object)
if class_name == "Steps":
print("FOUND")
steps_members_scanner = inspect.getmembers(class_object(), predicate=inspect.ismethod)
requires_is_step_attr = False
break
# this loop populates self.steps dictionnary from the instanciated (bound) step methods.
for step_name, step in inspect.getmembers(self, predicate=inspect.ismethod):
if getattr(step, "is_step", False):
for step_name, step in steps_members_scanner:
print("step:", step_name)
if not requires_is_step_attr or getattr(step, "is_step", False):
_steps[step_name] = step
if len(_steps) < 1:
......@@ -86,7 +99,7 @@ class BasePipe(BasePipeType, metaclass=ABCMeta):
number_of_steps_with_requirements = 0
for step in _steps.values():
if len(step.requires):
if len(getattr(step, "requires", [])):
number_of_steps_with_requirements += 1
if number_of_steps_with_requirements < len(_steps) - 1:
......@@ -99,7 +112,7 @@ class BasePipe(BasePipeType, metaclass=ABCMeta):
# They must inherit from BaseStep
self.steps = {}
for step_name, step in _steps.items():
step = self.step_class(self.pipeline, self, step) # , step_name)
step = self.step_class(self.pipeline, self, step, step_name=step_name) # , step_name)
self.steps[step_name] = step # replace the bound_method by a step_class using that bound method,
# so that we attach the necessary components to it.
setattr(self, step_name, step)
......
......@@ -77,12 +77,7 @@ class BaseStep:
pipe: "BasePipe"
pipeline: "Pipeline"
def __init__(
self,
pipeline: "Pipeline",
pipe: "BasePipe",
worker: MethodType,
):
def __init__(self, pipeline: "Pipeline", pipe: "BasePipe", worker: MethodType, step_name=None):
"""Initialize a BaseStep object.
Args:
......@@ -111,12 +106,15 @@ class BaseStep:
# we attach the values of the worker elements to BaseStep
# as they are get only (no setter) on worker (bound method)
self.do_dispatch = self.worker.do_dispatch
self.version = self.worker.version
self.requires = self.worker.requires
self.step_name = self.worker.step_name
self.do_dispatch = getattr(self.worker, "do_dispatch", False)
self.version = getattr(self.worker, "version", 0)
self.requires = getattr(self.worker, "requires", [])
self.step_name = getattr(self.worker, "step_name", step_name)
self.callbacks = self.worker.callbacks
if not self.step_name:
raise ValueError("Step name cannot be blank nor None")
self.callbacks = getattr(self.worker, "callbacks", [])
self.worker = MethodType(worker.__func__, self)
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment