Module omnipy.api.protocols.public.engine
Overview
View Source
from typing import Callable, Protocol, runtime_checkable
from omnipy.api.protocols.private.engine import IsEngine
from omnipy.api.protocols.public.compute import IsDagFlow, IsFuncFlow, IsLinearFlow, IsTask
@runtime_checkable
class IsTaskRunnerEngine(IsEngine, Protocol):
""""""
def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:
...
@runtime_checkable
class IsLinearFlowRunnerEngine(IsEngine, Protocol):
""""""
def apply_linear_flow_decorator(self,
linear_flow: IsLinearFlow,
job_callback_accept_decorator: Callable) -> None:
...
@runtime_checkable
class IsDagFlowRunnerEngine(IsEngine, Protocol):
""""""
def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,
job_callback_accept_decorator: Callable) -> None:
...
@runtime_checkable
class IsFuncFlowRunnerEngine(IsEngine, Protocol):
""""""
def apply_func_flow_decorator(self,
func_flow: IsFuncFlow,
job_callback_accept_decorator: Callable) -> None:
...
Classes
IsDagFlowRunnerEngine
View Source
@runtime_checkable
class IsDagFlowRunnerEngine(IsEngine, Protocol):
""""""
def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,
job_callback_accept_decorator: Callable) -> None:
...
Static methods
get_config_cls
Returns:
Type | Description |
---|---|
Type[IsEngineConfig] |
View Source
@classmethod
def get_config_cls(cls) -> Type[IsEngineConfig]:
...
Methods
apply_dag_flow_decorator
def apply_dag_flow_decorator(
self,
dag_flow: omnipy.api.protocols.public.compute.IsDagFlow,
job_callback_accept_decorator: Callable
) -> None
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dag_flow |
IsDagFlow |
||
job_callback_accept_decorator |
Callable |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,
job_callback_accept_decorator: Callable) -> None:
...
set_config
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
IsEngineConfig |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def set_config(self, config: IsEngineConfig) -> None:
...
set_registry
def set_registry(
self,
registry: omnipy.api.protocols.private.log.IsRunStateRegistry | None
) -> None
Parameters:
Name | Type | Description | Default |
---|---|---|---|
registry |
omnipy.api.protocols.private.log.IsRunStateRegistry |
None |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
...
IsFuncFlowRunnerEngine
View Source
@runtime_checkable
class IsFuncFlowRunnerEngine(IsEngine, Protocol):
""""""
def apply_func_flow_decorator(self,
func_flow: IsFuncFlow,
job_callback_accept_decorator: Callable) -> None:
...
Static methods
get_config_cls
Returns:
Type | Description |
---|---|
Type[IsEngineConfig] |
View Source
@classmethod
def get_config_cls(cls) -> Type[IsEngineConfig]:
...
Methods
apply_func_flow_decorator
def apply_func_flow_decorator(
self,
func_flow: omnipy.api.protocols.public.compute.IsFuncFlow,
job_callback_accept_decorator: Callable
) -> None
Parameters:
Name | Type | Description | Default |
---|---|---|---|
func_flow |
IsFuncFlow |
||
job_callback_accept_decorator |
Callable |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def apply_func_flow_decorator(self,
func_flow: IsFuncFlow,
job_callback_accept_decorator: Callable) -> None:
...
set_config
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
IsEngineConfig |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def set_config(self, config: IsEngineConfig) -> None:
...
set_registry
def set_registry(
self,
registry: omnipy.api.protocols.private.log.IsRunStateRegistry | None
) -> None
Parameters:
Name | Type | Description | Default |
---|---|---|---|
registry |
omnipy.api.protocols.private.log.IsRunStateRegistry |
None |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
...
IsLinearFlowRunnerEngine
View Source
@runtime_checkable
class IsLinearFlowRunnerEngine(IsEngine, Protocol):
""""""
def apply_linear_flow_decorator(self,
linear_flow: IsLinearFlow,
job_callback_accept_decorator: Callable) -> None:
...
Static methods
get_config_cls
Returns:
Type | Description |
---|---|
Type[IsEngineConfig] |
View Source
@classmethod
def get_config_cls(cls) -> Type[IsEngineConfig]:
...
Methods
apply_linear_flow_decorator
def apply_linear_flow_decorator(
self,
linear_flow: omnipy.api.protocols.public.compute.IsLinearFlow,
job_callback_accept_decorator: Callable
) -> None
Parameters:
Name | Type | Description | Default |
---|---|---|---|
linear_flow |
IsLinearFlow |
||
job_callback_accept_decorator |
Callable |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def apply_linear_flow_decorator(self,
linear_flow: IsLinearFlow,
job_callback_accept_decorator: Callable) -> None:
...
set_config
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
IsEngineConfig |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def set_config(self, config: IsEngineConfig) -> None:
...
set_registry
def set_registry(
self,
registry: omnipy.api.protocols.private.log.IsRunStateRegistry | None
) -> None
Parameters:
Name | Type | Description | Default |
---|---|---|---|
registry |
omnipy.api.protocols.private.log.IsRunStateRegistry |
None |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
...
IsTaskRunnerEngine
View Source
@runtime_checkable
class IsTaskRunnerEngine(IsEngine, Protocol):
""""""
def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:
...
Static methods
get_config_cls
Returns:
Type | Description |
---|---|
Type[IsEngineConfig] |
View Source
@classmethod
def get_config_cls(cls) -> Type[IsEngineConfig]:
...
Methods
apply_task_decorator
def apply_task_decorator(
self,
task: omnipy.api.protocols.public.compute.IsTask,
job_callback_accept_decorator: Callable
) -> None
Parameters:
Name | Type | Description | Default |
---|---|---|---|
task |
IsTask |
||
job_callback_accept_decorator |
Callable |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:
...
set_config
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config |
IsEngineConfig |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def set_config(self, config: IsEngineConfig) -> None:
...
set_registry
def set_registry(
self,
registry: omnipy.api.protocols.private.log.IsRunStateRegistry | None
) -> None
Parameters:
Name | Type | Description | Default |
---|---|---|---|
registry |
omnipy.api.protocols.private.log.IsRunStateRegistry |
None |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
...