Skip to content

omnipy.hub.runtime

CLASS DESCRIPTION
Runtime
RuntimeConfig
RuntimeObjects
ATTRIBUTE DESCRIPTION
runtime

TYPE: IsRuntime

runtime module-attribute

runtime: IsRuntime = Runtime()

Runtime

Bases: DataPublisher

METHOD DESCRIPTION
__init__
reset_backlinks
reset_subscriptions

Resets all subscriptions for the current instance.

ATTRIBUTE DESCRIPTION
config

TYPE: IsRuntimeConfig

objects

TYPE: IsRuntimeObjects

Source code in src/omnipy/hub/runtime.py
class Runtime(DataPublisher):
    config: IsRuntimeConfig = pyd.Field(default_factory=RuntimeConfig)
    objects: IsRuntimeObjects = pyd.Field(default_factory=RuntimeObjects)

    def __init__(self, **data: Any) -> None:
        super().__init__(**data)
        detect_and_setup_user_interface(self)
        self.reset_subscriptions()

    def reset_subscriptions(self) -> None:
        """
        Resets all subscriptions for the current instance.

        This function unsubscribes all existing subscriptions and then sets up new subscriptions
        for the `config` and `objects` members.
        """

        self.reset_backlinks()

        self.config.unsubscribe_all()
        self.objects.unsubscribe_all()

        # Makes sure that the config references in the runtime objects always refer to the related
        # config in runtime, even when:
        #
        # 1. The runtime config is replaced with a new config object, or
        # 2. The runtime object is replaced with a new runtime object, or
        # 3. Both of the above
        #
        # This might e.g. happen due to the use of  mock objects for testing, or if the config is
        # reloaded from a file.

        self.config.subscribe_attr('data', self.objects.data_class_creator.set_config)
        self.config.subscribe_attr('job', self.objects.job_creator.set_config)
        self.config.subscribe_attr('root_log', self.objects.root_log.set_config)

        self.config.data.ui.subscribe_attr('detected_type', self.objects.setup_reactive)

        if UserInterfaceType.is_jupyter_in_browser(self.config.data.ui.detected_type):
            assert self.objects.reactive is not None
            self.config.data.ui.subscribe_attr('jupyter',
                                               self.objects.reactive.jupyter_ui_config.set)
            self.config.data.ui.subscribe_attr('text', self.objects.reactive.text_config.set)
            self.config.data.ui.subscribe_attr('layout', self.objects.reactive.layout_config.set)

        self.config.engine.subscribe_attr('local', self.objects.local.set_config)
        self.config.engine.subscribe_attr('prefect', self.objects.prefect.set_config)

        self.objects.subscribe_attr(
            'reactive',
            self.objects.data_class_creator.set_reactive_objects,
        )

        # Makes sure that the registry references in the job runners always refer to the registry
        # object in runtime, even when one or both of the objects are replaced with new objects.
        self.objects.subscribe_attr('registry', self.objects.local.set_registry)
        self.objects.subscribe_attr('registry', self.objects.prefect.set_registry)

        # Makes sure that the engine used by the job creator is always the one specified in the
        # 'engine' config item in runtime
        self.config.engine.subscribe_attr('local', self._update_job_creator_engine)
        self.config.engine.subscribe_attr('prefect', self._update_job_creator_engine)
        self.config.engine.subscribe_attr('choice', self._update_job_creator_engine)

        # Makes sure that the local and prefect engine configs are always reloaded when the local
        # or prefect engine objects are replaced with new engine objects. This is necessary because
        # the `get_config_cls()` method of the engine objects define the classes of the respective
        # engine config objects to be used. If an engine object is replaced with a new engine that
        # require a different engine config class than is currently used, the config object will be
        # replaced with a new default config object of the correct type.
        self.objects.subscribe_attr('local', self._update_local_runner_config)
        self.objects.subscribe_attr('prefect', self._update_prefect_engine_config)

    def reset_backlinks(self) -> None:
        self.config._back = self  # type: ignore[attr-defined]
        self.objects._back = self  # type: ignore[attr-defined]

    def _get_engine_config(self, choice: EngineChoice.Literals) -> IsEngineConfig:
        return getattr(self.config.engine, choice)

    def _set_engine_config(
        self,
        choice: EngineChoice.Literals,
        engine_config: IsJobRunnerConfig,
    ) -> None:
        setattr(self.config.engine, choice, engine_config)

    def _get_engine(self, choice: EngineChoice.Literals) -> IsEngine:
        return getattr(self.objects, choice)

    def _new_engine_config_if_new_cls(
        self,
        engine: IsEngine,
        choice: EngineChoice.Literals,
    ) -> None:
        # TODO: when parsing config from file is implemented, make sure that the new engine
        #       config classes here reparse the config files
        engine_config_cls = engine.get_config_cls()
        if self._get_engine_config(choice).__class__ is not engine_config_cls:
            self._set_engine_config(choice, engine_config_cls())

    def _update_local_runner_config(self, local_runner: IsEngine) -> None:
        self._new_engine_config_if_new_cls(local_runner, EngineChoice.LOCAL)

    def _update_prefect_engine_config(self, prefect_engine: IsEngine) -> None:
        self._new_engine_config_if_new_cls(prefect_engine, EngineChoice.PREFECT)

    def _update_job_creator_engine(self, _item_changed: Any) -> None:
        self.objects.job_creator.set_engine(self._get_engine(self.config.engine.choice))

config class-attribute instance-attribute

config: IsRuntimeConfig = pyd.Field(default_factory=RuntimeConfig)

objects class-attribute instance-attribute

objects: IsRuntimeObjects = pyd.Field(default_factory=RuntimeObjects)

__init__

__init__(**data: Any) -> None
Source code in src/omnipy/hub/runtime.py
def __init__(self, **data: Any) -> None:
    super().__init__(**data)
    detect_and_setup_user_interface(self)
    self.reset_subscriptions()
reset_backlinks() -> None
Source code in src/omnipy/hub/runtime.py
def reset_backlinks(self) -> None:
    self.config._back = self  # type: ignore[attr-defined]
    self.objects._back = self  # type: ignore[attr-defined]

reset_subscriptions

reset_subscriptions() -> None

Resets all subscriptions for the current instance.

This function unsubscribes all existing subscriptions and then sets up new subscriptions for the config and objects members.

Source code in src/omnipy/hub/runtime.py
def reset_subscriptions(self) -> None:
    """
    Resets all subscriptions for the current instance.

    This function unsubscribes all existing subscriptions and then sets up new subscriptions
    for the `config` and `objects` members.
    """

    self.reset_backlinks()

    self.config.unsubscribe_all()
    self.objects.unsubscribe_all()

    # Makes sure that the config references in the runtime objects always refer to the related
    # config in runtime, even when:
    #
    # 1. The runtime config is replaced with a new config object, or
    # 2. The runtime object is replaced with a new runtime object, or
    # 3. Both of the above
    #
    # This might e.g. happen due to the use of  mock objects for testing, or if the config is
    # reloaded from a file.

    self.config.subscribe_attr('data', self.objects.data_class_creator.set_config)
    self.config.subscribe_attr('job', self.objects.job_creator.set_config)
    self.config.subscribe_attr('root_log', self.objects.root_log.set_config)

    self.config.data.ui.subscribe_attr('detected_type', self.objects.setup_reactive)

    if UserInterfaceType.is_jupyter_in_browser(self.config.data.ui.detected_type):
        assert self.objects.reactive is not None
        self.config.data.ui.subscribe_attr('jupyter',
                                           self.objects.reactive.jupyter_ui_config.set)
        self.config.data.ui.subscribe_attr('text', self.objects.reactive.text_config.set)
        self.config.data.ui.subscribe_attr('layout', self.objects.reactive.layout_config.set)

    self.config.engine.subscribe_attr('local', self.objects.local.set_config)
    self.config.engine.subscribe_attr('prefect', self.objects.prefect.set_config)

    self.objects.subscribe_attr(
        'reactive',
        self.objects.data_class_creator.set_reactive_objects,
    )

    # Makes sure that the registry references in the job runners always refer to the registry
    # object in runtime, even when one or both of the objects are replaced with new objects.
    self.objects.subscribe_attr('registry', self.objects.local.set_registry)
    self.objects.subscribe_attr('registry', self.objects.prefect.set_registry)

    # Makes sure that the engine used by the job creator is always the one specified in the
    # 'engine' config item in runtime
    self.config.engine.subscribe_attr('local', self._update_job_creator_engine)
    self.config.engine.subscribe_attr('prefect', self._update_job_creator_engine)
    self.config.engine.subscribe_attr('choice', self._update_job_creator_engine)

    # Makes sure that the local and prefect engine configs are always reloaded when the local
    # or prefect engine objects are replaced with new engine objects. This is necessary because
    # the `get_config_cls()` method of the engine objects define the classes of the respective
    # engine config objects to be used. If an engine object is replaced with a new engine that
    # require a different engine config class than is currently used, the config object will be
    # replaced with a new default config object of the correct type.
    self.objects.subscribe_attr('local', self._update_local_runner_config)
    self.objects.subscribe_attr('prefect', self._update_prefect_engine_config)

RuntimeConfig

Bases: RuntimeEntryPublisher, ConfigBase

METHOD DESCRIPTION
reset_to_defaults
ATTRIBUTE DESCRIPTION
data

TYPE: IsDataConfig

engine

TYPE: IsEngineConfig

job

TYPE: IsJobConfig

root_log

TYPE: IsRootLogConfig

Source code in src/omnipy/hub/runtime.py
class RuntimeConfig(RuntimeEntryPublisher, ConfigBase):
    data: IsDataConfig = pyd.Field(default_factory=_data_config_factory)
    engine: IsEngineConfig = pyd.Field(default_factory=EngineConfig)
    job: IsJobConfig = pyd.Field(default_factory=_job_config_factory)
    root_log: IsRootLogConfig = pyd.Field(default_factory=RootLogConfig)

    def reset_to_defaults(self) -> None:
        prev_back = self._back
        self._back = None

        self.data = DataConfig()
        self.engine = EngineConfig()
        self.job = JobConfig()
        self.root_log = RootLogConfig()

        self._back = prev_back
        if self._back is not None:
            self._back.reset_subscriptions()

data class-attribute instance-attribute

data: IsDataConfig = pyd.Field(default_factory=_data_config_factory)

engine class-attribute instance-attribute

engine: IsEngineConfig = pyd.Field(default_factory=EngineConfig)

job class-attribute instance-attribute

job: IsJobConfig = pyd.Field(default_factory=_job_config_factory)

root_log class-attribute instance-attribute

root_log: IsRootLogConfig = pyd.Field(default_factory=RootLogConfig)

reset_to_defaults

reset_to_defaults() -> None
Source code in src/omnipy/hub/runtime.py
def reset_to_defaults(self) -> None:
    prev_back = self._back
    self._back = None

    self.data = DataConfig()
    self.engine = EngineConfig()
    self.job = JobConfig()
    self.root_log = RootLogConfig()

    self._back = prev_back
    if self._back is not None:
        self._back.reset_subscriptions()

RuntimeObjects

Bases: RuntimeEntryPublisher, DataPublisher

METHOD DESCRIPTION
setup_reactive
ATTRIBUTE DESCRIPTION
data_class_creator

TYPE: IsDataClassCreator

job_creator

TYPE: IsJobConfigHolder

local

TYPE: IsEngine

prefect

TYPE: IsEngine

reactive

TYPE: IsReactiveObjects | None

registry

TYPE: IsRunStateRegistry

root_log

TYPE: IsRootLogObjects

serializers

TYPE: IsSerializerRegistry

Source code in src/omnipy/hub/runtime.py
class RuntimeObjects(RuntimeEntryPublisher, DataPublisher):
    job_creator: IsJobConfigHolder = pyd.Field(default_factory=_job_creator_factory)
    data_class_creator: IsDataClassCreator = pyd.Field(default_factory=_data_class_creator_factory)
    reactive: IsReactiveObjects | None = None
    local: IsEngine = pyd.Field(default_factory=LocalRunner)
    prefect: IsEngine = pyd.Field(default_factory=PrefectEngine)
    registry: IsRunStateRegistry = pyd.Field(default_factory=RunStateRegistry)
    serializers: IsSerializerRegistry = pyd.Field(default_factory=SerializerRegistry)
    root_log: IsRootLogObjects = pyd.Field(default_factory=RootLogObjects)

    def setup_reactive(self, ui_type: UserInterfaceType.Literals) -> None:
        if UserInterfaceType.is_jupyter_in_browser(ui_type):
            from omnipy.data._display.integrations.jupyter.helpers import ReactiveObjects

            if self.reactive is None:
                self.reactive = ReactiveObjects()
        else:
            if self.reactive is not None:
                self.reactive = None

data_class_creator class-attribute instance-attribute

data_class_creator: IsDataClassCreator = pyd.Field(default_factory=_data_class_creator_factory)

job_creator class-attribute instance-attribute

job_creator: IsJobConfigHolder = pyd.Field(default_factory=_job_creator_factory)

local class-attribute instance-attribute

local: IsEngine = pyd.Field(default_factory=LocalRunner)

prefect class-attribute instance-attribute

prefect: IsEngine = pyd.Field(default_factory=PrefectEngine)

reactive class-attribute instance-attribute

reactive: IsReactiveObjects | None = None

registry class-attribute instance-attribute

registry: IsRunStateRegistry = pyd.Field(default_factory=RunStateRegistry)

root_log class-attribute instance-attribute

root_log: IsRootLogObjects = pyd.Field(default_factory=RootLogObjects)

serializers class-attribute instance-attribute

serializers: IsSerializerRegistry = pyd.Field(default_factory=SerializerRegistry)

setup_reactive

setup_reactive(ui_type: UserInterfaceType.Literals) -> None
Source code in src/omnipy/hub/runtime.py
def setup_reactive(self, ui_type: UserInterfaceType.Literals) -> None:
    if UserInterfaceType.is_jupyter_in_browser(ui_type):
        from omnipy.data._display.integrations.jupyter.helpers import ReactiveObjects

        if self.reactive is None:
            self.reactive = ReactiveObjects()
    else:
        if self.reactive is not None:
            self.reactive = None