Skip to content

omnipy.shared.protocols.hub.runtime

CLASS DESCRIPTION
IsRootLogObjects
IsRuntime
IsRuntimeConfig
IsRuntimeObjects

IsRootLogObjects

Bases: Protocol

METHOD DESCRIPTION
set_config
ATTRIBUTE DESCRIPTION
config

TYPE: IsRootLogConfig

file_handler

TYPE: RotatingFileHandler | None

formatter

TYPE: logging.Formatter | None

stderr_handler

TYPE: logging.StreamHandler | None

stdout_handler

TYPE: logging.StreamHandler | None

Source code in src/omnipy/shared/protocols/hub/runtime.py
@runtime_checkable
class IsRootLogObjects(Protocol):
    """"""
    formatter: logging.Formatter | None = None
    stdout_handler: logging.StreamHandler | None = None
    stderr_handler: logging.StreamHandler | None = None
    file_handler: RotatingFileHandler | None = None

    def set_config(self, config: IsRootLogConfig) -> None:
        ...

    @property
    def config(self) -> IsRootLogConfig:
        ...

config property

file_handler class-attribute instance-attribute

file_handler: RotatingFileHandler | None = None

formatter class-attribute instance-attribute

formatter: logging.Formatter | None = None

stderr_handler class-attribute instance-attribute

stderr_handler: logging.StreamHandler | None = None

stdout_handler class-attribute instance-attribute

stdout_handler: logging.StreamHandler | None = None

set_config

set_config(config: IsRootLogConfig) -> None
Source code in src/omnipy/shared/protocols/hub/runtime.py
def set_config(self, config: IsRootLogConfig) -> None:
    ...

IsRuntime

Bases: Protocol

METHOD DESCRIPTION
reset_subscriptions
ATTRIBUTE DESCRIPTION
config

TYPE: IsRuntimeConfig

objects

TYPE: IsRuntimeObjects

Source code in src/omnipy/shared/protocols/hub/runtime.py
@runtime_checkable
class IsRuntime(Protocol):
    """"""
    config: IsRuntimeConfig
    objects: IsRuntimeObjects

    def reset_subscriptions(self):
        ...

config instance-attribute

objects instance-attribute

reset_subscriptions

reset_subscriptions()
Source code in src/omnipy/shared/protocols/hub/runtime.py
def reset_subscriptions(self):
    ...

IsRuntimeConfig

Bases: IsConfigBase, Protocol

METHOD DESCRIPTION
as_model
deepcopy
default_repr_to_terminal_str
reset_to_defaults
subscribe
subscribe_attr
unsubscribe_all
ATTRIBUTE DESCRIPTION
data

TYPE: IsDataConfig

engine

TYPE: IsEngineConfig

job

TYPE: IsJobConfig

root_log

TYPE: IsRootLogConfig

Source code in src/omnipy/shared/protocols/hub/runtime.py
@runtime_checkable
class IsRuntimeConfig(IsConfigBase, Protocol):
    """"""
    data: IsDataConfig
    engine: IsEngineConfig
    job: IsJobConfig
    root_log: IsRootLogConfig

    def reset_to_defaults(self) -> None:
        ...

data instance-attribute

engine instance-attribute

job instance-attribute

root_log instance-attribute

root_log: IsRootLogConfig

as_model

as_model() -> IsModel[dict[str, object]]
Source code in src/omnipy/shared/protocols/config.py
def as_model(self) -> 'IsModel[dict[str, object]]':
    ...

deepcopy

deepcopy() -> Self
Source code in src/omnipy/shared/protocols/util.py
def deepcopy(self) -> Self:
    ...

default_repr_to_terminal_str

default_repr_to_terminal_str(ui_type: TerminalOutputUserInterfaceType.Literals) -> str
Source code in src/omnipy/shared/protocols/config.py
def default_repr_to_terminal_str(
    self,
    ui_type: TerminalOutputUserInterfaceType.Literals,
) -> str:
    ...

reset_to_defaults

reset_to_defaults() -> None
Source code in src/omnipy/shared/protocols/hub/runtime.py
def reset_to_defaults(self) -> None:
    ...

subscribe

subscribe(callback_fun: Callable[..., None], do_callback: bool = True) -> None
Source code in src/omnipy/shared/protocols/util.py
def subscribe(self, callback_fun: Callable[..., None], do_callback: bool = True) -> None:
    ...

subscribe_attr

subscribe_attr(attr_name: str, callback_fun: Callable[..., None])
Source code in src/omnipy/shared/protocols/util.py
def subscribe_attr(self, attr_name: str, callback_fun: Callable[..., None]):
    ...

unsubscribe_all

unsubscribe_all() -> None
Source code in src/omnipy/shared/protocols/util.py
def unsubscribe_all(self) -> None:
    ...

IsRuntimeObjects

Bases: IsDataPublisher, Protocol

METHOD DESCRIPTION
deepcopy
setup_reactive
subscribe
subscribe_attr
unsubscribe_all
ATTRIBUTE DESCRIPTION
data_class_creator

TYPE: IsDataClassCreator

job_creator

TYPE: IsJobConfigHolder

local

TYPE: IsEngine

prefect

TYPE: IsEngine

reactive

TYPE: IsReactiveObjects | None

registry

TYPE: IsRunStateRegistry

root_log

TYPE: IsRootLogObjects

serializers

TYPE: IsSerializerRegistry

Source code in src/omnipy/shared/protocols/hub/runtime.py
@runtime_checkable
class IsRuntimeObjects(IsDataPublisher, Protocol):
    """"""

    job_creator: IsJobConfigHolder
    data_class_creator: IsDataClassCreator
    reactive: IsReactiveObjects | None
    local: IsEngine
    prefect: IsEngine
    registry: IsRunStateRegistry
    serializers: IsSerializerRegistry
    root_log: IsRootLogObjects

    def setup_reactive(self, ui_type: UserInterfaceType.Literals) -> None:
        ...

data_class_creator instance-attribute

data_class_creator: IsDataClassCreator

job_creator instance-attribute

job_creator: IsJobConfigHolder

local instance-attribute

local: IsEngine

prefect instance-attribute

prefect: IsEngine

reactive instance-attribute

reactive: IsReactiveObjects | None

registry instance-attribute

root_log instance-attribute

root_log: IsRootLogObjects

serializers instance-attribute

serializers: IsSerializerRegistry

deepcopy

deepcopy() -> Self
Source code in src/omnipy/shared/protocols/util.py
def deepcopy(self) -> Self:
    ...

setup_reactive

setup_reactive(ui_type: UserInterfaceType.Literals) -> None
Source code in src/omnipy/shared/protocols/hub/runtime.py
def setup_reactive(self, ui_type: UserInterfaceType.Literals) -> None:
    ...

subscribe

subscribe(callback_fun: Callable[..., None], do_callback: bool = True) -> None
Source code in src/omnipy/shared/protocols/util.py
def subscribe(self, callback_fun: Callable[..., None], do_callback: bool = True) -> None:
    ...

subscribe_attr

subscribe_attr(attr_name: str, callback_fun: Callable[..., None])
Source code in src/omnipy/shared/protocols/util.py
def subscribe_attr(self, attr_name: str, callback_fun: Callable[..., None]):
    ...

unsubscribe_all

unsubscribe_all() -> None
Source code in src/omnipy/shared/protocols/util.py
def unsubscribe_all(self) -> None:
    ...