diff --git a/pypelines/__init__.py b/pypelines/__init__.py new file mode 100644 index 0000000000000000000000000000000000000000..134c1697b67ddffe61701c0b7b617af4c42023d0 --- /dev/null +++ b/pypelines/__init__.py @@ -0,0 +1,2 @@ +__version__ = "0.0.1" + diff --git a/pypelines/multisession.py b/pypelines/multisession.py new file mode 100644 index 0000000000000000000000000000000000000000..f3d62cd102acd7f2a453a914a253bb7462fb6d38 --- /dev/null +++ b/pypelines/multisession.py @@ -0,0 +1,6 @@ + + +class BaseMultisessionAccessor: + + def __init__(self, parent_step): + pass \ No newline at end of file diff --git a/pypelines/pipe.py b/pypelines/pipe.py new file mode 100644 index 0000000000000000000000000000000000000000..0f8d8580429088dcd5d6a524a97def51919c847e --- /dev/null +++ b/pypelines/pipe.py @@ -0,0 +1,102 @@ +from . step import BaseStep, step +from . multisession import BaseMultisessionAccessor + +from typing import Callable, Type, Iterable + +class PipeMetaclass(type): + + def __new__(cls : Type, pipe_name : str, bases : Iterable[Type], attributes : dict) -> Type: + attributes["pipe_name"] = pipe_name + + steps = {} + # this loop allows to populate cls.steps from the unistanciated the step methods of the cls. + for name, attribute in attributes.items(): + if getattr(attribute, "is_step", False): + steps[name] = PipeMetaclass.make_step_attributes(attribute , pipe_name , name) + + if len(attributes["steps"]) > 1 and attributes["single_step"]: + raise ValueError(f"Cannot set single_step to True if you registered more than one step inside {pipe_name} class") + + attributes["steps"] = steps + + return super().__new__(cls, pipe_name, bases, attributes) + + @staticmethod + def make_step_attributes(step : Callable, pipe_name : str, step_name : str) -> Callable: + + setattr(step, "pipe_name", pipe_name) + setattr(step, "step_name", step_name) + + return step + +class BasePipe(metaclass = PipeMetaclass): + # this class must implements only the logic to link blocks together. + # It is agnostic about what way data is stored, and the way the blocks function. + # Hence it is designed to be overloaded, and cannot be used as is. + + use_versions = True + single_step = False + step_class = BaseStep + multisession_class = BaseMultisessionAccessor + + def __init__(self, parent_pipeline : BasePipeline) -> None : + + self.multisession = self.multisession_class(self) + self.pipeline = parent_pipeline + + # this loop allows to populate self.steps from the now instanciated version of the step method. + # Using only instanciated version is important to be able to use self into it later, + # without confusing ourselved with uninstanciated versions in the steps dict + for step_name, _ in self.steps.items(): + step = getattr(self , step_name) # get the instanciated step method from name. + step = self.step_class(self.pipeline, self, step, step_name) + self.steps[step_name] = step + setattr(self, step_name, step) + + # attaches itself to the parent pipeline + if self.single_step : + step = list(self.steps.values())[0] + self.pipeline.pipes[self.pipe_name] = step + setattr(self.pipeline, self.pipe_name, step) + else : + self.pipeline.pipes[self.pipe_name] = self + setattr(self.pipeline, self.pipe_name, self) + + def __repr__(self) -> str: + return f"<{self.__class__.__bases__[0].__name__}.{self.pipe_name} PipeObject>" + + def file_getter(self, session, extra, version) -> | : + #finds file, opens it, and return data. + #if it cannot find the file, it returns a IOError + ... + #it will get + + def _check_version(self, step_name , found_version): + #checks the found_version of the file is above or equal in the requirement order, to the step we are looking for + ... + + def step_version(self, step): + #simply returns the current string of the version that is in . + ... + + def disk_version(self, session, extra) -> str : + #simply returns the version string of the file(s) that it found. + ... + + def file_saver(self, session, dumped_object, version ): + ... + + def file_loader(self, session, dumped_object, version ): + ... + + def file_checker(self, session): + ... + + def dispatcher(self, function): + # the dispatcher must be return a wrapped function + ... + +class ExamplePipe(BasePipe): + + @step(requires = []) + def diff --git a/pypelines/pipeline.py b/pypelines/pipeline.py new file mode 100644 index 0000000000000000000000000000000000000000..274307af9912155a0f1a0cc128ba10fa498a4f0b --- /dev/null +++ b/pypelines/pipeline.py @@ -0,0 +1,55 @@ + + + + + +class BasePipeline: + + pipes = {} + + def register_pipe(self,pipe_class): + pipe_class(self) + + return pipe_class + + def __init__(self, versions = None): + self.versions = versions + + def resolve(self, instance_name : str) : + pipe_name , step_name = instance_name.split(".") + try : + pipe = self.pipes[pipe_name] + if pipe.single_step : + return pipe + return pipe.steps[step_name] + except KeyError : + raise KeyError(f"No instance {instance_name} has been registered to the pipeline") + + def get_requirement_stack(self, instance, required_steps = None, parents = None, max_recursion = 100): + if required_steps is None : + required_steps = [] + + if parents is None: + parents = [] + + if isinstance(instance,str): + instance = self.resolve(instance) + + if instance in parents : + raise RecursionError(f"Circular import : {parents[-1]} requires {instance} wich exists in parents hierarchy : {parents}") + + parents.append(instance) + if len(parents) > max_recursion : + raise ValueError("Too much recursion, unrealistic number of pipes chaining. Investigate errors or increase max_recursion") + instanciated_requires = [] + + for requirement in instance.requires: + required_steps, requirement = self.get_requirement_stack(requirement, required_steps, parents, max_recursion) + if not requirement in required_steps: + required_steps.append(requirement) + instanciated_requires.append(requirement) + + instance.requires = instanciated_requires + parents.pop(-1) + + return required_steps, instance \ No newline at end of file diff --git a/pypelines/step.py b/pypelines/step.py new file mode 100644 index 0000000000000000000000000000000000000000..bcd0f0f4590bb9a7495445c398f2dba90373b472 --- /dev/null +++ b/pypelines/step.py @@ -0,0 +1,59 @@ +from functools import wraps, partial, update_wrapper + +def step(requires = []): + # This method allows to register class methods inheriting of BasePipe as steps. + # It basically just step an "is_step" stamp on the method that are defined as steps. + # This stamp will later be used in the metaclass __new__ to set additionnal usefull attributes to those methods + if not isinstance(requires, list): + requires = [requires] + + def registrate(function): + function.requires = requires + function.is_step = True + return function + return registrate + +class BaseStep: + + def __init__(self, pipeline, pipe, step, step_name): + self.pipeline = pipeline # save an instanciated access to the pipeline parent + self.pipe = pipe # save an instanciated access to the pipe parent + self.step = step # save an instanciated access to the step function (undecorated) + self.pipe_name = pipe.pipe_name + self.step_name = step_name + + self.single_step = self.pipe.single_step + self.requires = self.step.requires + self.is_step = True + + self.requirement_stack = partial( self.pipeline.get_requirement_stack, instance = self ) + self.step_version = partial( self.pipe.step_version, step = self ) + self.load = self._loading_wrapper(self.pipe.file_loader) + self.save = self._saving_wrapper(self.pipe.file_saver) + self.generate = self._generating_wrapper(self.step) + + update_wrapper(self, self.step) + + def __call__(self, *args, **kwargs): + return self.step(*args, **kwargs) + + def __repr__(self): + return f"<{self.pipe_name}.{self.step_name} StepObject>" + + def _saving_wrapper(self, function): + return self.pipe.dispatcher(self._version_wrapper(function, self.pipe.step_version)) + + def _loading_wrapper(self, function): + return self.pipe.dispatcher(self._version_wrapper(function, self.pipe.step_version)) + + def _generating_wrapper(self, function): + return self.pipe.dispatcher( + session_log_decorator + ) + + def _version_wrapper(self, function_to_wrap, version_getter): + @wraps(function_to_wrap) + def wrapper(*args,**kwargs): + version = version_getter(self) + return function_to_wrap(*args, version=version, **kwargs) + return wrapper diff --git a/pypelines/versions.json b/pypelines/versions.json new file mode 100644 index 0000000000000000000000000000000000000000..4f21efe5a0dd048f0e8621081a94890a4de13f1b --- /dev/null +++ b/pypelines/versions.json @@ -0,0 +1,40 @@ +{ + "other_steps": { + "versions": { + "azdaz1161": { + "deprecated": false, + "step_name": "step2", + "function_hash": "azdaz", + "creation_date": "2023-15-48 T 01:15:02.0515" + }, + "second_version_string": { + "deprecated": false, + "step_name": "step1", + "function_hash": "step1", + "creation_date": "2023-15-48 T 01:15:02.0515" + } + }, + "step_renamings": { + "old_step_name": "my_step_name" + } + }, + "demoPipeClass": { + "versions": { + "azbdauzbm1811": { + "deprecated": false, + "step_name": "initial", + "function_hash": "my_function_hash", + "creation_date": "2023-15-48 T 01:15:02.0515" + }, + "another_version_string": { + "deprecated": false, + "step_name": "step_name3", + "function_hash": "my_function_hash", + "creation_date": "2023-15-48 T 01:15:02.0515" + } + }, + "step_renamings": { + "old_step_name": "my_step_name" + } + } +} \ No newline at end of file diff --git a/pypelines/versions.py b/pypelines/versions.py new file mode 100644 index 0000000000000000000000000000000000000000..02cfdaed30a8a30a582252a809df33054d61aaf6 --- /dev/null +++ b/pypelines/versions.py @@ -0,0 +1,121 @@ +from dataclasses import dataclass +import hashlib, random, json, inspect, re + +@dataclass +class Version: + pipe_name : str + id : str + detail : dict + + @property + def deprecated(self) : + return self.detail["deprecated"] + + @property + def function_hash(self) : + return self.detail["function_hash"] + + @property + def step_name(self) : + return self.detail["step_name"] + + @property + def creation_date(self) : + return self.detail["creation_date"] + + def update_function_hash(self, new_function_hash): + self.detail["function_hash"] = new_function_hash + + def deprecate(self): + self.detail["deprecated"] = True + + def __str__(self): + return self.id + +class BaseVersionHandler: + + function_hash_remove = ["comments", " ", "\n"] + + def __init__(self, pipe, *args, **kwargs): + self.pipe = pipe + + def compare_function_hash(self, step): + try : + version = self.get_active_version(step) + except KeyError : + return False + current_hash = self.get_function_hash(step.step) + return version.function_hash == current_hash + + def get_function_hash(self, function) -> str : + + def remove_comments(self, source): + # remove all occurance of single-line comments (#comments) from the source + source_no_comments = re.sub(r'#[^\n]*', '', source) + # remove all occurance of multi-line comments ("""comment""") from the source + source_no_comments = re.sub(r'\'\'\'.*?\'\'\'', '', source_no_comments, flags=re.DOTALL) + source_no_comments = re.sub(r'\"\"\".*?\"\"\"', '', source_no_comments, flags=re.DOTALL) + return source_no_comments + + remove = self.function_hash_remove + source = inspect.getsource(function) + + if "comments" in remove : + remove.pop(remove.index("comments")) + source = remove_comments(source) + + for rem in remove : + source = source.replace(rem, "") + + return hashlib.sha256(source.encode()).hexdigest() + + def get_new_version_string(self) -> str : + ... + + def get_active_version(self, step) -> Version : + ... + + def apply_changes(self, versions) -> None : + ... + +class HashVersionHandler(BaseVersionHandler): + + hash_collision_max_attempts = 3 + + def __init__(self, pipe, file_path) : + super().__init__(pipe) + self.path = file_path + self.memory = json.load(open(file_path,"r")) + self.verify_structure() + + def get_new_version_string(self) -> str : + max_attempts = self.hash_collision_max_attempts + for i in range(max_attempts):# max no-collision attempts, then raises error + + m = hashlib.sha256() + r = str(random.random()).encode() + m.update(r) + new_hash = m.hexdigest()[0:7] + + if new_hash not in self.memory["versions"].keys(): + return new_hash + + raise ValueError("Could not determine a unique hash not colliding with existing values. Please investigate code / step_architecture.json file ?") + + def apply_changes(self, versions ): + if not isinstance(versions, list) : + versions = [versions] + + for version in versions : + try : + edited_object = self.memory["versions"][version.id] + except KeyError: + self.steps_dict[version.pipe_name] = self.steps_dict.get(version.pipe_name,{"versions":{},"step_renamings":{}}) + edited_object = self.steps_dict[version.pipe_name]["versions"][version.id] = self.steps_dict[version.pipe_name]["versions"].get(version.id,{}) + edited_object.update(version.detail) + + def verify_structure(self, pipeline): + for pipe_name, pipe in pipeline.pipes.items(): + for step_name, step in pipe.steps.items(): + pass + #in here, check function hash of the current implementation matches the one in the version, or send a warning to user that he may update the version or ignor by updating the function hash and keeping the same version \ No newline at end of file diff --git a/setup.py b/setup.py new file mode 100644 index 0000000000000000000000000000000000000000..2046f522bc9d8888293fe6637f3d68d81cf5999f --- /dev/null +++ b/setup.py @@ -0,0 +1,49 @@ +# -*- coding: utf-8 -*- +""" +Created on Wed Mar 15 21:34:34 2023 + +@author: tjostmou +""" + +from setuptools import setup, find_packages +from pathlib import Path + +def get_version(rel_path): + here = Path(__file__).parent.absolute() + with open(here.joinpath(rel_path), 'r') as fp: + for line in fp.read().splitlines(): + if line.startswith('__version__'): + delim = '"' if '"' in line else "'" + return line.split(delim)[1] + raise RuntimeError('Unable to find version string.') + +setup( + name= 'analines', + version= get_version(Path('pypelines', '__init__.py')), + packages=find_packages(), + url= 'https://gitlab.pasteur.fr/haisslab/data-management/pypelines', + license= 'MIT', + author= 'Timothé Jost-MOUSSEAU', + author_email= 'timothe.jost-mousseau@pasteur.com', + description= 'Image and video management for research use, in an unified easy to use API', + classifiers=[ + 'Development Status :: 3 - Alpha', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: MIT License', + 'Programming Language :: Python :: 3', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', + 'Programming Language :: Python :: 3.11', + ], + install_requires=[ + "numpy>=1.23", + "opencv-python>=4.6", + "ffmpeg>=1.4", + "tifffile>=2022.10" + ], + entry_points={}, + scripts={}, +) \ No newline at end of file