Skip to content

omnipy.shared.protocols.engine.job_runner

CLASS DESCRIPTION
IsDagFlowRunnerEngine
IsFuncFlowRunnerEngine
IsLinearFlowRunnerEngine
IsTaskRunnerEngine

IsDagFlowRunnerEngine

Bases: IsEngine, Protocol

METHOD DESCRIPTION
__init__
apply_dag_flow_decorator
get_config_cls
set_config
set_registry
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobRunnerConfig

registry

TYPE: IsRunStateRegistry | None

Source code in src/omnipy/shared/protocols/engine/job_runner.py
@runtime_checkable
class IsDagFlowRunnerEngine(IsEngine, Protocol):
    """"""
    def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,
                                 job_callback_accept_decorator: Callable) -> None:
        ...

config property

registry property

registry: IsRunStateRegistry | None

__init__

__init__() -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def __init__(self) -> None:
    ...

apply_dag_flow_decorator

apply_dag_flow_decorator(dag_flow: IsDagFlow, job_callback_accept_decorator: Callable) -> None
Source code in src/omnipy/shared/protocols/engine/job_runner.py
def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,
                             job_callback_accept_decorator: Callable) -> None:
    ...

get_config_cls classmethod

get_config_cls() -> Type[IsJobRunnerConfig]
Source code in src/omnipy/shared/protocols/engine/base.py
@classmethod
def get_config_cls(cls) -> Type[IsJobRunnerConfig]:
    ...

set_config

set_config(config: IsJobRunnerConfig) -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def set_config(self, config: IsJobRunnerConfig) -> None:
    ...

set_registry

set_registry(registry: IsRunStateRegistry | None) -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
    ...

IsFuncFlowRunnerEngine

Bases: IsEngine, Protocol

METHOD DESCRIPTION
__init__
apply_func_flow_decorator
get_config_cls
set_config
set_registry
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobRunnerConfig

registry

TYPE: IsRunStateRegistry | None

Source code in src/omnipy/shared/protocols/engine/job_runner.py
@runtime_checkable
class IsFuncFlowRunnerEngine(IsEngine, Protocol):
    """"""
    def apply_func_flow_decorator(self,
                                  func_flow: IsFuncFlow,
                                  job_callback_accept_decorator: Callable) -> None:
        ...

config property

registry property

registry: IsRunStateRegistry | None

__init__

__init__() -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def __init__(self) -> None:
    ...

apply_func_flow_decorator

apply_func_flow_decorator(func_flow: IsFuncFlow, job_callback_accept_decorator: Callable) -> None
Source code in src/omnipy/shared/protocols/engine/job_runner.py
def apply_func_flow_decorator(self,
                              func_flow: IsFuncFlow,
                              job_callback_accept_decorator: Callable) -> None:
    ...

get_config_cls classmethod

get_config_cls() -> Type[IsJobRunnerConfig]
Source code in src/omnipy/shared/protocols/engine/base.py
@classmethod
def get_config_cls(cls) -> Type[IsJobRunnerConfig]:
    ...

set_config

set_config(config: IsJobRunnerConfig) -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def set_config(self, config: IsJobRunnerConfig) -> None:
    ...

set_registry

set_registry(registry: IsRunStateRegistry | None) -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
    ...

IsLinearFlowRunnerEngine

Bases: IsEngine, Protocol

METHOD DESCRIPTION
__init__
apply_linear_flow_decorator
get_config_cls
set_config
set_registry
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobRunnerConfig

registry

TYPE: IsRunStateRegistry | None

Source code in src/omnipy/shared/protocols/engine/job_runner.py
@runtime_checkable
class IsLinearFlowRunnerEngine(IsEngine, Protocol):
    """"""
    def apply_linear_flow_decorator(self,
                                    linear_flow: IsLinearFlow,
                                    job_callback_accept_decorator: Callable) -> None:
        ...

config property

registry property

registry: IsRunStateRegistry | None

__init__

__init__() -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def __init__(self) -> None:
    ...

apply_linear_flow_decorator

apply_linear_flow_decorator(
    linear_flow: IsLinearFlow, job_callback_accept_decorator: Callable
) -> None
Source code in src/omnipy/shared/protocols/engine/job_runner.py
def apply_linear_flow_decorator(self,
                                linear_flow: IsLinearFlow,
                                job_callback_accept_decorator: Callable) -> None:
    ...

get_config_cls classmethod

get_config_cls() -> Type[IsJobRunnerConfig]
Source code in src/omnipy/shared/protocols/engine/base.py
@classmethod
def get_config_cls(cls) -> Type[IsJobRunnerConfig]:
    ...

set_config

set_config(config: IsJobRunnerConfig) -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def set_config(self, config: IsJobRunnerConfig) -> None:
    ...

set_registry

set_registry(registry: IsRunStateRegistry | None) -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
    ...

IsTaskRunnerEngine

Bases: IsEngine, Protocol

METHOD DESCRIPTION
__init__
apply_task_decorator
get_config_cls
set_config
set_registry
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobRunnerConfig

registry

TYPE: IsRunStateRegistry | None

Source code in src/omnipy/shared/protocols/engine/job_runner.py
@runtime_checkable
class IsTaskRunnerEngine(IsEngine, Protocol):
    """"""
    def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:
        ...

config property

registry property

registry: IsRunStateRegistry | None

__init__

__init__() -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def __init__(self) -> None:
    ...

apply_task_decorator

apply_task_decorator(task: IsTask, job_callback_accept_decorator: Callable) -> None
Source code in src/omnipy/shared/protocols/engine/job_runner.py
def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:
    ...

get_config_cls classmethod

get_config_cls() -> Type[IsJobRunnerConfig]
Source code in src/omnipy/shared/protocols/engine/base.py
@classmethod
def get_config_cls(cls) -> Type[IsJobRunnerConfig]:
    ...

set_config

set_config(config: IsJobRunnerConfig) -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def set_config(self, config: IsJobRunnerConfig) -> None:
    ...

set_registry

set_registry(registry: IsRunStateRegistry | None) -> None
Source code in src/omnipy/shared/protocols/engine/base.py
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
    ...