Skip to content

Module omnipy.api.protocols.public.engine

Overview

View Source
from typing import Callable, Protocol, runtime_checkable

from omnipy.api.protocols.private.engine import IsEngine

from omnipy.api.protocols.public.compute import IsDagFlow, IsFuncFlow, IsLinearFlow, IsTask

@runtime_checkable

class IsTaskRunnerEngine(IsEngine, Protocol):

    """"""

    def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:

        ...

@runtime_checkable

class IsLinearFlowRunnerEngine(IsEngine, Protocol):

    """"""

    def apply_linear_flow_decorator(self,

                                    linear_flow: IsLinearFlow,

                                    job_callback_accept_decorator: Callable) -> None:

        ...

@runtime_checkable

class IsDagFlowRunnerEngine(IsEngine, Protocol):

    """"""

    def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,

                                 job_callback_accept_decorator: Callable) -> None:

        ...

@runtime_checkable

class IsFuncFlowRunnerEngine(IsEngine, Protocol):

    """"""

    def apply_func_flow_decorator(self,

                                  func_flow: IsFuncFlow,

                                  job_callback_accept_decorator: Callable) -> None:

        ...

Classes

IsDagFlowRunnerEngine

class IsDagFlowRunnerEngine(
    *args,
    **kwargs
)
View Source
@runtime_checkable

class IsDagFlowRunnerEngine(IsEngine, Protocol):

    """"""

    def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,

                                 job_callback_accept_decorator: Callable) -> None:

        ...

Static methods

get_config_cls
def get_config_cls(

) -> Type[omnipy.api.protocols.public.config.IsEngineConfig]

Returns:

Type Description
Type[IsEngineConfig]
View Source
    @classmethod

    def get_config_cls(cls) -> Type[IsEngineConfig]:

        ...

Methods

apply_dag_flow_decorator
def apply_dag_flow_decorator(
    self,
    dag_flow: omnipy.api.protocols.public.compute.IsDagFlow,
    job_callback_accept_decorator: Callable
) -> None

Parameters:

Name Type Description Default
dag_flow IsDagFlow
job_callback_accept_decorator Callable

Returns:

Type Description
NoneType
View Source
    def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,

                                 job_callback_accept_decorator: Callable) -> None:

        ...
set_config
def set_config(
    self,
    config: omnipy.api.protocols.public.config.IsEngineConfig
) -> None

Parameters:

Name Type Description Default
config IsEngineConfig

Returns:

Type Description
NoneType
View Source
    def set_config(self, config: IsEngineConfig) -> None:

        ...
set_registry
def set_registry(
    self,
    registry: omnipy.api.protocols.private.log.IsRunStateRegistry | None
) -> None

Parameters:

Name Type Description Default
registry omnipy.api.protocols.private.log.IsRunStateRegistry None

Returns:

Type Description
NoneType
View Source
    def set_registry(self, registry: IsRunStateRegistry | None) -> None:

        ...

IsFuncFlowRunnerEngine

class IsFuncFlowRunnerEngine(
    *args,
    **kwargs
)
View Source
@runtime_checkable

class IsFuncFlowRunnerEngine(IsEngine, Protocol):

    """"""

    def apply_func_flow_decorator(self,

                                  func_flow: IsFuncFlow,

                                  job_callback_accept_decorator: Callable) -> None:

        ...

Static methods

get_config_cls
def get_config_cls(

) -> Type[omnipy.api.protocols.public.config.IsEngineConfig]

Returns:

Type Description
Type[IsEngineConfig]
View Source
    @classmethod

    def get_config_cls(cls) -> Type[IsEngineConfig]:

        ...

Methods

apply_func_flow_decorator
def apply_func_flow_decorator(
    self,
    func_flow: omnipy.api.protocols.public.compute.IsFuncFlow,
    job_callback_accept_decorator: Callable
) -> None

Parameters:

Name Type Description Default
func_flow IsFuncFlow
job_callback_accept_decorator Callable

Returns:

Type Description
NoneType
View Source
    def apply_func_flow_decorator(self,

                                  func_flow: IsFuncFlow,

                                  job_callback_accept_decorator: Callable) -> None:

        ...
set_config
def set_config(
    self,
    config: omnipy.api.protocols.public.config.IsEngineConfig
) -> None

Parameters:

Name Type Description Default
config IsEngineConfig

Returns:

Type Description
NoneType
View Source
    def set_config(self, config: IsEngineConfig) -> None:

        ...
set_registry
def set_registry(
    self,
    registry: omnipy.api.protocols.private.log.IsRunStateRegistry | None
) -> None

Parameters:

Name Type Description Default
registry omnipy.api.protocols.private.log.IsRunStateRegistry None

Returns:

Type Description
NoneType
View Source
    def set_registry(self, registry: IsRunStateRegistry | None) -> None:

        ...

IsLinearFlowRunnerEngine

class IsLinearFlowRunnerEngine(
    *args,
    **kwargs
)
View Source
@runtime_checkable

class IsLinearFlowRunnerEngine(IsEngine, Protocol):

    """"""

    def apply_linear_flow_decorator(self,

                                    linear_flow: IsLinearFlow,

                                    job_callback_accept_decorator: Callable) -> None:

        ...

Static methods

get_config_cls
def get_config_cls(

) -> Type[omnipy.api.protocols.public.config.IsEngineConfig]

Returns:

Type Description
Type[IsEngineConfig]
View Source
    @classmethod

    def get_config_cls(cls) -> Type[IsEngineConfig]:

        ...

Methods

apply_linear_flow_decorator
def apply_linear_flow_decorator(
    self,
    linear_flow: omnipy.api.protocols.public.compute.IsLinearFlow,
    job_callback_accept_decorator: Callable
) -> None

Parameters:

Name Type Description Default
linear_flow IsLinearFlow
job_callback_accept_decorator Callable

Returns:

Type Description
NoneType
View Source
    def apply_linear_flow_decorator(self,

                                    linear_flow: IsLinearFlow,

                                    job_callback_accept_decorator: Callable) -> None:

        ...
set_config
def set_config(
    self,
    config: omnipy.api.protocols.public.config.IsEngineConfig
) -> None

Parameters:

Name Type Description Default
config IsEngineConfig

Returns:

Type Description
NoneType
View Source
    def set_config(self, config: IsEngineConfig) -> None:

        ...
set_registry
def set_registry(
    self,
    registry: omnipy.api.protocols.private.log.IsRunStateRegistry | None
) -> None

Parameters:

Name Type Description Default
registry omnipy.api.protocols.private.log.IsRunStateRegistry None

Returns:

Type Description
NoneType
View Source
    def set_registry(self, registry: IsRunStateRegistry | None) -> None:

        ...

IsTaskRunnerEngine

class IsTaskRunnerEngine(
    *args,
    **kwargs
)
View Source
@runtime_checkable

class IsTaskRunnerEngine(IsEngine, Protocol):

    """"""

    def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:

        ...

Static methods

get_config_cls
def get_config_cls(

) -> Type[omnipy.api.protocols.public.config.IsEngineConfig]

Returns:

Type Description
Type[IsEngineConfig]
View Source
    @classmethod

    def get_config_cls(cls) -> Type[IsEngineConfig]:

        ...

Methods

apply_task_decorator
def apply_task_decorator(
    self,
    task: omnipy.api.protocols.public.compute.IsTask,
    job_callback_accept_decorator: Callable
) -> None

Parameters:

Name Type Description Default
task IsTask
job_callback_accept_decorator Callable

Returns:

Type Description
NoneType
View Source
    def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:

        ...
set_config
def set_config(
    self,
    config: omnipy.api.protocols.public.config.IsEngineConfig
) -> None

Parameters:

Name Type Description Default
config IsEngineConfig

Returns:

Type Description
NoneType
View Source
    def set_config(self, config: IsEngineConfig) -> None:

        ...
set_registry
def set_registry(
    self,
    registry: omnipy.api.protocols.private.log.IsRunStateRegistry | None
) -> None

Parameters:

Name Type Description Default
registry omnipy.api.protocols.private.log.IsRunStateRegistry None

Returns:

Type Description
NoneType
View Source
    def set_registry(self, registry: IsRunStateRegistry | None) -> None:

        ...