Skip to content
Snippets Groups Projects
Commit 26a1e452 authored by Timothe Jost's avatar Timothe Jost
Browse files

almost working base feature set

parent 012b0095
Branches
No related tags found
No related merge requests found
Source diff could not be displayed: it is too large. Options to address this: view the blob.
from . step import BaseStep from . step import BaseStep
from . multisession import BaseMultisessionAccessor from . multisession import BaseMultisessionAccessor
from . sessions import Session from . sessions import Session
from . disk import PickleObject
from functools import wraps from functools import wraps
import inspect import inspect
...@@ -10,37 +11,37 @@ from typing import Callable, Type, Iterable, Protocol, TYPE_CHECKING ...@@ -10,37 +11,37 @@ from typing import Callable, Type, Iterable, Protocol, TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from .pipeline import BasePipeline from .pipeline import BasePipeline
class PipeMetaclass(type): # class PipeMetaclass(type):
def __new__(cls : Type, pipe_name : str, bases : Iterable[Type], attributes : dict) -> Type: # def __new__(cls : Type, pipe_name : str, bases : Iterable[Type], attributes : dict) -> Type:
return super().__new__(cls, pipe_name, bases, attributes) # return super().__new__(cls, pipe_name, bases, attributes)
def __init__(cls : Type, pipe_name : str, bases : Iterable[Type], attributes : dict) -> None: # def __init__(cls : Type, pipe_name : str, bases : Iterable[Type], attributes : dict) -> None:
steps = getattr(cls,"steps",{}) # steps = getattr(cls,"steps",{})
for name, attribute in attributes.items(): # for name, attribute in attributes.items():
if getattr(attribute, "is_step", False): # if getattr(attribute, "is_step", False):
steps[name] = PipeMetaclass.step_with_attributes(attribute , pipe_name , name) # steps[name] = PipeMetaclass.step_with_attributes(attribute , pipe_name , name)
setattr(cls,"steps",steps) # setattr(cls,"steps",steps)
@staticmethod # @staticmethod
def step_with_attributes(step : BaseStep, pipe_name : str, step_name : str) -> BaseStep: # def step_with_attributes(step : BaseStep, pipe_name : str, step_name : str) -> BaseStep:
setattr(step, "pipe_name", pipe_name) # setattr(step, "pipe_name", pipe_name)
setattr(step, "step_name", step_name) # setattr(step, "step_name", step_name)
return step # return step
class BasePipe(metaclass = PipeMetaclass): class BasePipe :#(metaclass = PipeMetaclass):
# this class must implements only the logic to link blocks together. # this class must implements only the logic to link blocks together.
# It is agnostic about what way data is stored, and the way the blocks function. # It is agnostic about what way data is stored, and the way the blocks function.
# Hence it is designed to be overloaded, and cannot be used as is. # Hence it is designed to be overloaded, and cannot be used as is.
use_versions = True
single_step = False single_step = False
step_class = BaseStep step_class = BaseStep
disk_class = PickleObject
multisession_class = BaseMultisessionAccessor multisession_class = BaseMultisessionAccessor
def __init__(self, parent_pipeline : "BasePipeline") -> None : def __init__(self, parent_pipeline : "BasePipeline") -> None :
...@@ -48,25 +49,36 @@ class BasePipe(metaclass = PipeMetaclass): ...@@ -48,25 +49,36 @@ class BasePipe(metaclass = PipeMetaclass):
self.multisession = self.multisession_class(self) self.multisession = self.multisession_class(self)
self.pipeline = parent_pipeline self.pipeline = parent_pipeline
self.pipe_name = self.__class__.__name__ self.pipe_name = self.__class__.__name__
print(self.pipe_name) #print(self.pipe_name)
self.steps = {}
for (step_name, step) in inspect.getmembers( self , predicate = inspect.ismethod ):
if getattr(step, "is_step", False):
self.steps[step_name] = step
if len(self.steps) < 1 :
raise ValueError(f"You should register at least one step class with @stepmethod in {self.pipe_name} class. { self.steps = }")
if len(self.steps) > 1 and self.single_step: if len(self.steps) > 1 and self.single_step:
raise ValueError(f"Cannot set single_step to True if you registered more than one step inside {self.pipe_name} class") raise ValueError(f"Cannot set single_step to True if you registered more than one step inside {self.pipe_name} class. { self.steps = }")
#if self.single_step is None :
# self.single_step = False if len(self.steps) > 1 else True
# this loop allows to populate self.steps from the now instanciated version of the step method. # this loop allows to populate self.steps from the now instanciated version of the step method.
# Using only instanciated version is important to be able to use self into it later, # Using only instanciated version is important to be able to use self into it later,
# without confusing ourselved with uninstanciated versions in the steps dict # without confusing ourselved with uninstanciated versions in the steps dict
for step_name, _ in self.steps.items(): for step_name, step in self.steps.items():
step = getattr(self , step_name) # get the instanciated step method from name.
step = self.step_class(self.pipeline, self, step, step_name) step = self.step_class(self.pipeline, self, step, step_name)
self.steps[step_name] = step self.steps[step_name] = step #replace the bound_method by a step_class using that bound method, so that we attach the necessary components to it.
setattr(self, step_name, step) setattr(self, step_name, step)
# attaches itself to the parent pipeline # attaches itself to the parent pipeline
if self.single_step : if self.single_step :
step = list(self.steps.values())[0] step = list(self.steps.values())[0]
self.pipeline.pipes[self.pipe_name] = step self.pipeline.pipes[self.pipe_name] = step
step.steps = self.steps #just add steps to this step serving as a pipe, so that it behaves similarly to a pipe for some pipelines function requiring this attribute to exist.
setattr(self.pipeline, self.pipe_name, step) setattr(self.pipeline, self.pipe_name, step)
else : else :
self.pipeline.pipes[self.pipe_name] = self self.pipeline.pipes[self.pipe_name] = self
......
...@@ -7,16 +7,20 @@ if TYPE_CHECKING: ...@@ -7,16 +7,20 @@ if TYPE_CHECKING:
class BasePipeline: class BasePipeline:
pipes = {} def __init__(self,name):
self.pipeline_name = name
self.pipes = {}
self.resolved = False
def register_pipe(self, pipe_class : type) -> type: def register_pipe(self, pipe_class : type) -> type:
"""Wrapper to instanciate and attache a a class inheriting from BasePipe it to the Pipeline instance. """Wrapper to instanciate and attache a a class inheriting from BasePipe it to the Pipeline instance.
The Wraper returns the class without changing it.""" The Wraper returns the class without changing it."""
pipe_class(self) instance = pipe_class(self)
#print(f"Added instance of Pipe {instance.pipe_name} to instance of Pipeline {self.__class__.__name__} {self = }")
self.resolved = False
return pipe_class return pipe_class
def resolve(self, instance_name : str) : def resolve_instance(self, instance_name : str) :
pipe_name , step_name = instance_name.split(".") pipe_name , step_name = instance_name.split(".")
try : try :
pipe = self.pipes[pipe_name] pipe = self.pipes[pipe_name]
...@@ -26,15 +30,31 @@ class BasePipeline: ...@@ -26,15 +30,31 @@ class BasePipeline:
except KeyError : except KeyError :
raise KeyError(f"No instance {instance_name} has been registered to the pipeline") raise KeyError(f"No instance {instance_name} has been registered to the pipeline")
def resolve(self):
if self.resolved:
return
for pipe in self.pipes.values() :
for step in pipe.steps.values() :
instanciated_requires = []
for req in step.requires :
if isinstance(req,str):
req = self.resolve_instance(req)
instanciated_requires.append(req)
step.requires = instanciated_requires
self.resolved = True
def get_requirement_stack(self, instance, required_steps = None, parents = None, max_recursion = 100): def get_requirement_stack(self, instance, required_steps = None, parents = None, max_recursion = 100):
if required_steps is None : if required_steps is None :
required_steps = [] required_steps = []
if parents is None: if parents is None:
parents = [] parents = []
if isinstance(instance,str): self.resolve()
instance = self.resolve(instance)
if instance in parents : if instance in parents :
raise RecursionError(f"Circular import : {parents[-1]} requires {instance} wich exists in parents hierarchy : {parents}") raise RecursionError(f"Circular import : {parents[-1]} requires {instance} wich exists in parents hierarchy : {parents}")
...@@ -42,15 +62,31 @@ class BasePipeline: ...@@ -42,15 +62,31 @@ class BasePipeline:
parents.append(instance) parents.append(instance)
if len(parents) > max_recursion : if len(parents) > max_recursion :
raise ValueError("Too much recursion, unrealistic number of pipes chaining. Investigate errors or increase max_recursion") raise ValueError("Too much recursion, unrealistic number of pipes chaining. Investigate errors or increase max_recursion")
instanciated_requires = []
for requirement in instance.requires: for requirement in instance.requires:
required_steps, requirement = self.get_requirement_stack(requirement, required_steps, parents, max_recursion) required_steps, requirement = self.get_requirement_stack(requirement, required_steps, parents, max_recursion)
if not requirement in required_steps: if not requirement in required_steps:
required_steps.append(requirement) required_steps.append(requirement)
instanciated_requires.append(requirement)
instance.requires = instanciated_requires
parents.pop(-1) parents.pop(-1)
return required_steps, instance return required_steps, instance
def get_graph(self):
from networkx import DiGraph
self.resolve()
callable_graph = DiGraph()
display_graph = DiGraph()
for pipe in self.pipes.values() :
for step in pipe.steps.values() :
callable_graph.add_node(step)
display_graph.add_node(step.full_name)
for req in step.requires :
callable_graph.add_edge(req, step)
display_graph.add_edge(req.full_name, step.full_name)
return callable_graph, display_graph
\ No newline at end of file
...@@ -8,28 +8,39 @@ from typing import Callable, Type, Iterable, Protocol, TYPE_CHECKING ...@@ -8,28 +8,39 @@ from typing import Callable, Type, Iterable, Protocol, TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
from .pipeline import BasePipeline from .pipeline import BasePipeline
from .pipe import BasePipe from .pipe import BasePipe
from .disk import BaseDiskObject
def stepmethod(requires=[], version=None): def stepmethod(requires=[], version=None):
# This method allows to register class methods inheriting of BasePipe as steps. # This method allows to register class methods inheriting of BasePipe as steps.
# It basically just step an "is_step" stamp on the method that are defined as steps. # It basically just step an "is_step" stamp on the method that are defined as steps.
# This stamp will later be used in the metaclass __new__ to set additionnal usefull attributes to those methods # This stamp will later be used in the metaclass __new__ to set additionnal usefull attributes to those methods
def registrate(function): def registrate(function):
function.requires = [requires] if not isinstance(requires, list) else requires function.requires = [requires] if not isinstance(requires, list) else requires
function.is_step = True function.is_step = True
function.use_version = False if version is None else True function.use_version = False if version is None else True
function.version = version function.version = version
return function return function
return registrate return registrate
class BaseStep:
def __init__(self, pipeline : "BasePipeline", pipe : "BasePipe", step : "BaseStep", step_name : str): class BaseStep:
def __init__(
self,
pipeline: "BasePipeline",
pipe: "BasePipe",
step: "BaseStep",
step_name: str,
):
self.pipeline = pipeline # save an instanciated access to the pipeline parent self.pipeline = pipeline # save an instanciated access to the pipeline parent
self.pipe = pipe # save an instanciated access to the pipe parent self.pipe = pipe # save an instanciated access to the pipe parent
self.step = step # save an instanciated access to the step function (undecorated) self.step = (
step # save an instanciated access to the step function (undecorated)
)
self.pipe_name = pipe.pipe_name self.pipe_name = pipe.pipe_name
self.step_name = step_name self.step_name = step_name
self.full_name = f"{self.pipe_name}.{self.step_name}"
self.use_version = self.step.use_version self.use_version = self.step.use_version
self.version = self.step.version self.version = self.step.version
...@@ -37,8 +48,10 @@ class BaseStep: ...@@ -37,8 +48,10 @@ class BaseStep:
self.requires = self.step.requires self.requires = self.step.requires
self.is_step = True self.is_step = True
self.requirement_stack = partial( self.pipeline.get_requirement_stack, instance = self ) self.requirement_stack = partial(
self.step_version = partial( self.pipe.step_version, step = self ) self.pipeline.get_requirement_stack, instance=self
)
# self.step_version = partial( self.pipe.step_version, step = self )
update_wrapper(self, self.step) update_wrapper(self, self.step)
self._make_wrapped_functions() self._make_wrapped_functions()
...@@ -55,25 +68,34 @@ class BaseStep: ...@@ -55,25 +68,34 @@ class BaseStep:
self.make_wrapped_generate() self.make_wrapped_generate()
def make_wrapped_save(self): def make_wrapped_save(self):
self.save = self.pipe.dispatcher(self._version_wrapper(self.pipe.file_saver)) def wrapper(session, data, extra=""):
disk_object = self.pipe.disk_class(session, self, extra=extra)
disk_object.check_disk()
return disk_object.save(data)
self.save = self.pipe.dispatcher(wrapper)
def make_wrapped_load(self): def make_wrapped_load(self):
self.load = self.pipe.dispatcher(self._version_wrapper(self.pipe.file_loader)) def wrapper(session, extra=""):
disk_object = self.pipe.disk_class(session, self, extra=extra)
disk_object.check_disk()
return disk_object.load()
self.load = self.pipe.dispatcher(wrapper)
def make_wrapped_generate(self): def make_wrapped_generate(self):
self.generate = loggedmethod( def wrapper(session, *args, extra="", **kwargs):
self._version_wrapper( disk_object = self.pipe.disk_class(session, self, extra=extra)
self.pipe.dispatcher( return loggedmethod(
self._load_or_generate_wrapper( self._load_or_generate_wrapper(
self._save_after_generate_wrapper( self._save_after_generate_wrapper(
self.pipe.pre_run_wrapper(self.step) self.pipe.pre_run_wrapper(self.step), disk_object
) ),
) disk_object,
)
)
) )
)(session, *args, extra = extra, **kwargs)
self.generate = self.pipe.dispatcher(wrapper)
def step_current_version(self) -> str: def step_current_version(self) -> str:
# simply returns the current string of the version that is in the config file. # simply returns the current string of the version that is in the config file.
...@@ -85,9 +107,12 @@ class BaseStep: ...@@ -85,9 +107,12 @@ class BaseStep:
def wrapper(*args, **kwargs): def wrapper(*args, **kwargs):
version = self.step_current_version(self) version = self.step_current_version(self)
return function(*args, version=version, **kwargs) return function(*args, version=version, **kwargs)
return wrapper return wrapper
def _load_or_generate_wrapper(self, function: Callable): def _load_or_generate_wrapper(
self, function: Callable, disk_object: "BaseDiskObject"
):
""" """
Decorator to load instead of calculating if not refreshing and saved data exists Decorator to load instead of calculating if not refreshing and saved data exists
""" """
...@@ -114,7 +139,7 @@ class BaseStep: ...@@ -114,7 +139,7 @@ class BaseStep:
kwargs = kwargs.copy() kwargs = kwargs.copy()
extra = kwargs.get("extra", "") extra = kwargs.get("extra", "")
version = kwargs.get("version", "") # version = kwargs.get("version", "")
skipping = kwargs.pop("skip", False) skipping = kwargs.pop("skip", False)
# we raise if file not found only if skipping is True # we raise if file not found only if skipping is True
refresh = kwargs.get("refresh", False) refresh = kwargs.get("refresh", False)
...@@ -136,35 +161,38 @@ class BaseStep: ...@@ -136,35 +161,38 @@ class BaseStep:
) )
if not refresh: if not refresh:
if skipping and self.pipe.file_checker(session_details, extra=extra, version=version): if disk_object.check_disk() and skipping:
logger.load_info( logger.info(
f"File exists for {self.pipe_name}{'.' + extra if extra else ''}. Loading and processing have been skipped" f"File exists for {self.pipe_name}{'.' + extra if extra else ''}. Loading and processing have been skipped"
) )
return None return None
logger.debug(f"Trying to load saved data") logger.debug(f"Trying to load saved data")
try: try:
result = self.pipe.file_loader(session_details, extra=extra, version=version) result = (
logger.load_info( disk_object.load()
) # self.pipe.file_loader(session_details, extra=extra, version=version)
logger.info(
f"Found and loaded {self.pipe_name}{'.' + extra if extra else ''} file. Processing has been skipped " f"Found and loaded {self.pipe_name}{'.' + extra if extra else ''} file. Processing has been skipped "
) )
return result return result
except IOError: except IOError:
logger.load_info( logger.info(
f"Could not find or load {self.pipe_name}{'.' + extra if extra else ''} saved file." f"Could not find or load {self.pipe_name}{'.' + extra if extra else ''} saved file."
) )
logger.load_info( logger.info(
f"Performing the computation to generate {self.pipe_name}{'.' + extra if extra else ''}. Hold tight." f"Performing the computation to generate {self.pipe_name}{'.' + extra if extra else ''}. Hold tight."
) )
return function(session_details, *args, **kwargs) return function(session_details, *args, **kwargs)
return wrap return wrap
def _save_after_generate_wrapper(self, function: Callable): def _save_after_generate_wrapper(
self, function: Callable, disk_object: "BaseDiskObject"
):
# decorator to load instead of calculating if not refreshing and saved data exists # decorator to load instead of calculating if not refreshing and saved data exists
@wraps(function) @wraps(function)
def wrap(session, *args, **kwargs): def wrap(session, *args, **kwargs):
logger = logging.getLogger("save_pipeline") logger = logging.getLogger("save_pipeline")
kwargs = kwargs.copy() kwargs = kwargs.copy()
...@@ -176,7 +204,8 @@ class BaseStep: ...@@ -176,7 +204,8 @@ class BaseStep:
if session is not None: if session is not None:
if save_pipeline: if save_pipeline:
# we overwrite inside saver, if file exists and save_pipeline is True # we overwrite inside saver, if file exists and save_pipeline is True
self.pipe.file_saver(session, result, extra=extra, version=version) disk_object.save(result)
# self.pipe.file_saver(session, result, extra=extra, version=version)
else: else:
logger.warning( logger.warning(
f"Cannot guess data saving location for {self.pipe_name}: 'session_details' argument must be supplied." f"Cannot guess data saving location for {self.pipe_name}: 'session_details' argument must be supplied."
......
call conda activate Inflow
cd ..
pip install -e .
PAUSE
\ No newline at end of file
...@@ -18,14 +18,14 @@ def get_version(rel_path): ...@@ -18,14 +18,14 @@ def get_version(rel_path):
raise RuntimeError('Unable to find version string.') raise RuntimeError('Unable to find version string.')
setup( setup(
name= 'analines', name= 'pypelines',
version= get_version(Path('pypelines', '__init__.py')), version= get_version(Path('pypelines', '__init__.py')),
packages=find_packages(), packages=find_packages(),
url= 'https://gitlab.pasteur.fr/haisslab/data-management/pypelines', url= 'https://gitlab.pasteur.fr/haisslab/data-management/pypelines',
license= 'MIT', license= 'MIT',
author= 'Timothé Jost-MOUSSEAU', author= 'Timothé Jost-MOUSSEAU',
author_email= 'timothe.jost-mousseau@pasteur.com', author_email= 'timothe.jost-mousseau@pasteur.com',
description= 'Framework to organize pprocessing files outputs.', description= 'Framework to organize processing code outputs to/from disk, processing chaining and versionning with a common easy to use api',
classifiers=[ classifiers=[
'Development Status :: 3 - Alpha', 'Development Status :: 3 - Alpha',
'Intended Audience :: Developers', 'Intended Audience :: Developers',
...@@ -39,10 +39,6 @@ setup( ...@@ -39,10 +39,6 @@ setup(
'Programming Language :: Python :: 3.11', 'Programming Language :: Python :: 3.11',
], ],
install_requires=[ install_requires=[
"numpy>=1.23",
"opencv-python>=4.6",
"ffmpeg>=1.4",
"tifffile>=2022.10"
], ],
entry_points={}, entry_points={},
scripts={}, scripts={},
......
No preview for this file type
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment