Skip to content
Snippets Groups Projects
Commit cbce7365 authored by Timothe Jost's avatar Timothe Jost
Browse files

typing

parent d5d264c2
No related branches found
No related tags found
No related merge requests found
Pipeline #156236 passed
__version__ = "0.0.80" __version__ = "0.0.81"
from . import loggs from . import loggs
from .pipes import * from .pipes import *
...@@ -7,6 +7,8 @@ from .steps import * ...@@ -7,6 +7,8 @@ from .steps import *
from .disk import * from .disk import *
from .sessions import * from .sessions import *
from .extend_pandas import extend_pandas
# NOTE: # NOTE:
# pypelines is enabling the logging system by default when importing it # pypelines is enabling the logging system by default when importing it
# (it comprises colored logging, session prefix-logging, and logging to a file located in downloads folder) # (it comprises colored logging, session prefix-logging, and logging to a file located in downloads folder)
......
import pandas as pd
from ..pipelines import Pipeline
from .typing import SessionPipelineAccessorProto
# This is only for type checkers, has no runtime effect
pd.DataFrame.pypeline: SessionPipelineAccessorProto
def extend_pandas():
if not hasattr(pd.DataFrame, "_pypelines_accessor_registered"):
@pd.api.extensions.register_dataframe_accessor("pypeline")
class SessionPipelineAccessor:
def __init__(self, pandas_obj: pd.DataFrame):
self._obj = pandas_obj
def __call__(self, pipeline: Pipeline):
self.pipeline = pipeline
return self
def output_exists(self, pipe_step_name: str):
names = pipe_step_name.split(".")
if len(names) == 1:
pipe_name = names[0]
step_name = self.pipeline.pipes[pipe_name].ordered_steps("highest")[0].step_name
elif len(names) == 2:
pipe_name = names[0]
step_name = names[1]
else:
raise ValueError("pipe_step_name should be either a pipe_name.step_name or pipe_name")
complete_name = f"{pipe_name}.{step_name}"
return self._obj.apply(
lambda session: self.pipeline.pipes[pipe_name]
.steps[step_name]
.get_disk_object(session)
.is_loadable(),
axis=1,
).rename(complete_name)
def add_ouput(self, pipe_step_name: str):
return self._obj.assign(**{pipe_step_name: self.output_exists(pipe_step_name)})
def where_output(self, pipe_step_name: str, exists: bool):
new_obj = SessionPipelineAccessor(self._obj)(self.pipeline).add_ouput(pipe_step_name)
return new_obj[new_obj[pipe_step_name] == exists]
import pandas as pd
from .typing import SessionPipelineAccessorProto
pd.DataFrame.pypeline: SessionPipelineAccessorProto # type: ignore
from typing import Protocol
import pandas as pd
from ..pipelines import Pipeline
class SessionPipelineAccessorProto(Protocol):
def __call__(self, pipeline: Pipeline) -> "SessionPipelineAccessorProto": ...
def output_exists(self, pipe_step_name: str) -> pd.Series: ...
...@@ -245,6 +245,10 @@ class BasePipe(BasePipeType, metaclass=ABCMeta): ...@@ -245,6 +245,10 @@ class BasePipe(BasePipeType, metaclass=ABCMeta):
# the dispatcher must be return a wrapped function # the dispatcher must be return a wrapped function
return function return function
def ordered_steps(self, first: Literal["lowest", "highest"] = "lowest"):
reverse = False if first == "lowest" else True
return sorted(list(self.steps.values()), key=lambda item: item.get_level(selfish=True), reverse=reverse)
def load(self, session, extra="", which: Literal["lowest", "highest"] = "highest"): def load(self, session, extra="", which: Literal["lowest", "highest"] = "highest"):
"""Load a step object for a session with optional extra data. """Load a step object for a session with optional extra data.
...@@ -260,20 +264,14 @@ class BasePipe(BasePipeType, metaclass=ABCMeta): ...@@ -260,20 +264,14 @@ class BasePipe(BasePipeType, metaclass=ABCMeta):
Raises: Raises:
ValueError: If no matching step object is found for the session. ValueError: If no matching step object is found for the session.
""" """
if which == "lowest":
reverse = False
else:
reverse = True
ordered_steps = sorted( ordered_steps = self.ordered_steps(first=which)
list(self.steps.values()), key=lambda item: item.get_level(selfish=True), reverse=reverse
)
highest_step = None highest_step = None
if isinstance(session, DataFrame): if isinstance(session, DataFrame):
# if multisession, we assume we are trying to just load sessions # if multisession, we assume we are trying to just load sessions
# that all have reached the same level of requirements. (otherwise, use generate) # that all have reached the same level of requirements. (otherwise, use generate to make them match levels)
# because of that, we use only the first session in the lot to search the highest loadable step # because of that, we use only the first session in the lot to search the highest loadable step
search_on_session = session.iloc[0] search_on_session = session.iloc[0]
else: else:
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment