Skip to content

Module omnipy.hub.runtime

Overview

View Source
from dataclasses import dataclass, field

from typing import Any

from omnipy.api.enums import EngineChoice

from omnipy.api.protocols.private.compute.job_creator import IsJobConfigHolder

from omnipy.api.protocols.private.engine import IsEngine

from omnipy.api.protocols.private.log import IsRunStateRegistry

from omnipy.api.protocols.public.config import (IsEngineConfig,

                                                IsJobConfig,

                                                IsLocalRunnerConfig,

                                                IsPrefectEngineConfig,

                                                IsRootLogConfig)

from omnipy.api.protocols.public.hub import IsRootLogObjects, IsRuntimeConfig, IsRuntimeObjects

from omnipy.compute.job import JobBase

from omnipy.config.engine import LocalRunnerConfig, PrefectEngineConfig

from omnipy.config.job import JobConfig

from omnipy.data.serializer import SerializerRegistry

from omnipy.engine.local import LocalRunner

from omnipy.hub.entry import DataPublisher, RuntimeEntryPublisher

from omnipy.hub.root_log import RootLogConfigEntryPublisher, RootLogObjects

from omnipy.log.registry import RunStateRegistry

from omnipy.modules.prefect.engine.prefect import PrefectEngine

def _job_creator_factory():

    return JobBase.job_creator

@dataclass

class RuntimeConfig(RuntimeEntryPublisher):

    job: IsJobConfig = field(default_factory=JobConfig)

    engine: EngineChoice = EngineChoice.LOCAL

    local: IsLocalRunnerConfig = field(default_factory=LocalRunnerConfig)

    prefect: IsPrefectEngineConfig = field(default_factory=PrefectEngineConfig)

    root_log: IsRootLogConfig = field(default_factory=RootLogConfigEntryPublisher)

@dataclass

class RuntimeObjects(RuntimeEntryPublisher):

    job_creator: IsJobConfigHolder = field(default_factory=_job_creator_factory)

    local: IsEngine = field(default_factory=LocalRunner)

    prefect: IsEngine = field(default_factory=PrefectEngine)

    registry: IsRunStateRegistry = field(default_factory=RunStateRegistry)

    root_log: IsRootLogObjects = field(default_factory=RootLogObjects)

@dataclass

class Runtime(DataPublisher):

    config: IsRuntimeConfig = field(default_factory=RuntimeConfig)

    objects: IsRuntimeObjects = field(default_factory=RuntimeObjects)

    def __post_init__(self):

        super().__init__()

        self.config._back = self

        self.config.root_log._back = self

        self.objects._back = self

        self.reset_subscriptions()

    def reset_subscriptions(self):

        """

        Resets all subscriptions for the current instance.

        This function unsubscribes all existing subscriptions and then sets up new subscriptions

        for the `config` and `objects` members.

        """

        self.config.unsubscribe_all()

        self.objects.unsubscribe_all()

        self.config.subscribe('job', self.objects.job_creator.set_config)

        self.config.subscribe('local', self.objects.local.set_config)

        self.config.subscribe('prefect', self.objects.prefect.set_config)

        self.config.subscribe('root_log', self.objects.root_log.set_config)

        self.config.subscribe('local', self._update_job_creator_engine)

        self.config.subscribe('prefect', self._update_job_creator_engine)

        self.config.subscribe('engine', self._update_job_creator_engine)

        self.objects.subscribe('registry', self.objects.local.set_registry)

        self.objects.subscribe('registry', self.objects.prefect.set_registry)

        self.objects.subscribe('local', self._update_local_runner_config)

        self.objects.subscribe('prefect', self._update_prefect_engine_config)

    def _get_engine_config(self, engine_choice: EngineChoice):

        return getattr(self.config, engine_choice)

    def _set_engine_config(self, engine_choice: EngineChoice, engine_config: IsEngineConfig):

        return setattr(self.config, engine_choice, engine_config)

    def _get_engine(self, engine_choice: EngineChoice):

        return getattr(self.objects, engine_choice)

    def _new_engine_config_if_new_cls(self, engine: IsEngine, engine_choice: EngineChoice) -> None:

        # TODO: when parsing config from file is implemented, make sure that the new engine

        #       config classes here reparse the config files

        engine_config_cls = engine.get_config_cls()

        if self._get_engine_config(engine_choice).__class__ is not engine_config_cls:

            self._set_engine_config(engine_choice, engine_config_cls())

    def _update_local_runner_config(self, local_runner: IsEngine):

        self._new_engine_config_if_new_cls(local_runner, EngineChoice.LOCAL)

    def _update_prefect_engine_config(self, prefect_engine: IsEngine):

        self._new_engine_config_if_new_cls(prefect_engine, EngineChoice.PREFECT)

    def _update_job_creator_engine(self, _item_changed: Any):

        self.objects.job_creator.set_engine(self._get_engine(self.config.engine))

    def _create_serializer_registry(self):

        from omnipy.modules.json.serializers import JsonDatasetToTarFileSerializer

        from omnipy.modules.pandas.serializers import PandasDatasetToTarFileSerializer

        from omnipy.modules.raw.serializers import RawDatasetToTarFileSerializer

        registry = SerializerRegistry()

        registry.register(PandasDatasetToTarFileSerializer)

        registry.register(RawDatasetToTarFileSerializer)

        registry.register(JsonDatasetToTarFileSerializer)

        return registry

Classes

Runtime

class Runtime(
    config: omnipy.api.protocols.public.hub.IsRuntimeConfig = <factory>,
    objects: omnipy.api.protocols.public.hub.IsRuntimeObjects = <factory>
)

Runtime(config: omnipy.api.protocols.public.hub.IsRuntimeConfig = , objects: omnipy.api.protocols.public.hub.IsRuntimeObjects = )

View Source
@dataclass

class Runtime(DataPublisher):

    config: IsRuntimeConfig = field(default_factory=RuntimeConfig)

    objects: IsRuntimeObjects = field(default_factory=RuntimeObjects)

    def __post_init__(self):

        super().__init__()

        self.config._back = self

        self.config.root_log._back = self

        self.objects._back = self

        self.reset_subscriptions()

    def reset_subscriptions(self):

        """

        Resets all subscriptions for the current instance.

        This function unsubscribes all existing subscriptions and then sets up new subscriptions

        for the `config` and `objects` members.

        """

        self.config.unsubscribe_all()

        self.objects.unsubscribe_all()

        self.config.subscribe('job', self.objects.job_creator.set_config)

        self.config.subscribe('local', self.objects.local.set_config)

        self.config.subscribe('prefect', self.objects.prefect.set_config)

        self.config.subscribe('root_log', self.objects.root_log.set_config)

        self.config.subscribe('local', self._update_job_creator_engine)

        self.config.subscribe('prefect', self._update_job_creator_engine)

        self.config.subscribe('engine', self._update_job_creator_engine)

        self.objects.subscribe('registry', self.objects.local.set_registry)

        self.objects.subscribe('registry', self.objects.prefect.set_registry)

        self.objects.subscribe('local', self._update_local_runner_config)

        self.objects.subscribe('prefect', self._update_prefect_engine_config)

    def _get_engine_config(self, engine_choice: EngineChoice):

        return getattr(self.config, engine_choice)

    def _set_engine_config(self, engine_choice: EngineChoice, engine_config: IsEngineConfig):

        return setattr(self.config, engine_choice, engine_config)

    def _get_engine(self, engine_choice: EngineChoice):

        return getattr(self.objects, engine_choice)

    def _new_engine_config_if_new_cls(self, engine: IsEngine, engine_choice: EngineChoice) -> None:

        # TODO: when parsing config from file is implemented, make sure that the new engine

        #       config classes here reparse the config files

        engine_config_cls = engine.get_config_cls()

        if self._get_engine_config(engine_choice).__class__ is not engine_config_cls:

            self._set_engine_config(engine_choice, engine_config_cls())

    def _update_local_runner_config(self, local_runner: IsEngine):

        self._new_engine_config_if_new_cls(local_runner, EngineChoice.LOCAL)

    def _update_prefect_engine_config(self, prefect_engine: IsEngine):

        self._new_engine_config_if_new_cls(prefect_engine, EngineChoice.PREFECT)

    def _update_job_creator_engine(self, _item_changed: Any):

        self.objects.job_creator.set_engine(self._get_engine(self.config.engine))

    def _create_serializer_registry(self):

        from omnipy.modules.json.serializers import JsonDatasetToTarFileSerializer

        from omnipy.modules.pandas.serializers import PandasDatasetToTarFileSerializer

        from omnipy.modules.raw.serializers import RawDatasetToTarFileSerializer

        registry = SerializerRegistry()

        registry.register(PandasDatasetToTarFileSerializer)

        registry.register(RawDatasetToTarFileSerializer)

        registry.register(JsonDatasetToTarFileSerializer)

        return registry

Methods

eq
def __eq__(
    self,
    other
)

Return self==value.

Parameters:

Name Type Description Default
other
setattr
def __setattr__(
    self,
    key,
    value
)

Implement setattr(self, name, value).

Parameters:

Name Type Description Default
key
value
View Source
    def __setattr__(self, key, value):

        super().__setattr__(key, value)

        if key in self._subscriptions:

            for callback_fun in self._subscriptions[key]:

                callback_fun(value)
reset_subscriptions
def reset_subscriptions(
    self
)

Resets all subscriptions for the current instance.

This function unsubscribes all existing subscriptions and then sets up new subscriptions for the config and objects members.

View Source
    def reset_subscriptions(self):

        """

        Resets all subscriptions for the current instance.

        This function unsubscribes all existing subscriptions and then sets up new subscriptions

        for the `config` and `objects` members.

        """

        self.config.unsubscribe_all()

        self.objects.unsubscribe_all()

        self.config.subscribe('job', self.objects.job_creator.set_config)

        self.config.subscribe('local', self.objects.local.set_config)

        self.config.subscribe('prefect', self.objects.prefect.set_config)

        self.config.subscribe('root_log', self.objects.root_log.set_config)

        self.config.subscribe('local', self._update_job_creator_engine)

        self.config.subscribe('prefect', self._update_job_creator_engine)

        self.config.subscribe('engine', self._update_job_creator_engine)

        self.objects.subscribe('registry', self.objects.local.set_registry)

        self.objects.subscribe('registry', self.objects.prefect.set_registry)

        self.objects.subscribe('local', self._update_local_runner_config)

        self.objects.subscribe('prefect', self._update_prefect_engine_config)
subscribe
def subscribe(
    self,
    config_item: str,
    callback_fun: Callable[[Any], NoneType]
)

Parameters:

Name Type Description Default
config_item str
callback_fun Callable[[Any], NoneType]
View Source
    def subscribe(self, config_item: str, callback_fun: Callable[[Any], None]):

        if not hasattr(self, config_item):

            raise AttributeError(f'No config items named "{config_item}"')

        elif config_item.startswith('_'):

            raise AttributeError(f'Subscribing to private member "{config_item}" not allowed')

        else:

            self._subscriptions[config_item].append(callback_fun)

            callback_fun(getattr(self, config_item))
unsubscribe_all
def unsubscribe_all(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def unsubscribe_all(self) -> None:

        self._subscriptions = _subscribers_factory()

RuntimeConfig

class RuntimeConfig(
    job: omnipy.api.protocols.public.config.IsJobConfig = <factory>,
    engine: omnipy.api.enums.EngineChoice = <EngineChoice.LOCAL: 'local'>,
    local: omnipy.api.protocols.public.config.IsLocalRunnerConfig = <factory>,
    prefect: omnipy.api.protocols.public.config.IsPrefectEngineConfig = <factory>,
    root_log: omnipy.api.protocols.public.config.IsRootLogConfig = <factory>
)

RuntimeConfig(job: omnipy.api.protocols.public.config.IsJobConfig = , engine: omnipy.api.enums.EngineChoice = , local: omnipy.api.protocols.public.config.IsLocalRunnerConfig = , prefect: omnipy.api.protocols.public.config.IsPrefectEngineConfig = , root_log: omnipy.api.protocols.public.config.IsRootLogConfig = )

View Source
@dataclass

class RuntimeConfig(RuntimeEntryPublisher):

    job: IsJobConfig = field(default_factory=JobConfig)

    engine: EngineChoice = EngineChoice.LOCAL

    local: IsLocalRunnerConfig = field(default_factory=LocalRunnerConfig)

    prefect: IsPrefectEngineConfig = field(default_factory=PrefectEngineConfig)

    root_log: IsRootLogConfig = field(default_factory=RootLogConfigEntryPublisher)

Class variables

engine

Methods

eq
def __eq__(
    self,
    other
)

Return self==value.

Parameters:

Name Type Description Default
other
setattr
def __setattr__(
    self,
    key,
    value
)

Implement setattr(self, name, value).

Parameters:

Name Type Description Default
key
value
View Source
    def __setattr__(self, key, value):

        super().__setattr__(key, value)

        if hasattr(self, key) and not key.startswith('_') and self._back is not None:

            self._back.reset_subscriptions()
subscribe
def subscribe(
    self,
    config_item: str,
    callback_fun: Callable[[Any], NoneType]
)

Parameters:

Name Type Description Default
config_item str
callback_fun Callable[[Any], NoneType]
View Source
    def subscribe(self, config_item: str, callback_fun: Callable[[Any], None]):

        if not hasattr(self, config_item):

            raise AttributeError(f'No config items named "{config_item}"')

        elif config_item.startswith('_'):

            raise AttributeError(f'Subscribing to private member "{config_item}" not allowed')

        else:

            self._subscriptions[config_item].append(callback_fun)

            callback_fun(getattr(self, config_item))
unsubscribe_all
def unsubscribe_all(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def unsubscribe_all(self) -> None:

        self._subscriptions = _subscribers_factory()

RuntimeObjects

class RuntimeObjects(
    job_creator: omnipy.api.protocols.private.compute.job_creator.IsJobConfigHolder = <factory>,
    local: omnipy.api.protocols.private.engine.IsEngine = <factory>,
    prefect: omnipy.api.protocols.private.engine.IsEngine = <factory>,
    registry: omnipy.api.protocols.private.log.IsRunStateRegistry = <factory>,
    root_log: omnipy.api.protocols.public.hub.IsRootLogObjects = <factory>
)

RuntimeObjects(job_creator: omnipy.api.protocols.private.compute.job_creator.IsJobConfigHolder = , local: omnipy.api.protocols.private.engine.IsEngine = , prefect: omnipy.api.protocols.private.engine.IsEngine = , registry: omnipy.api.protocols.private.log.IsRunStateRegistry = , root_log: omnipy.api.protocols.public.hub.IsRootLogObjects = )

View Source
@dataclass

class RuntimeObjects(RuntimeEntryPublisher):

    job_creator: IsJobConfigHolder = field(default_factory=_job_creator_factory)

    local: IsEngine = field(default_factory=LocalRunner)

    prefect: IsEngine = field(default_factory=PrefectEngine)

    registry: IsRunStateRegistry = field(default_factory=RunStateRegistry)

    root_log: IsRootLogObjects = field(default_factory=RootLogObjects)

Methods

eq
def __eq__(
    self,
    other
)

Return self==value.

Parameters:

Name Type Description Default
other
setattr
def __setattr__(
    self,
    key,
    value
)

Implement setattr(self, name, value).

Parameters:

Name Type Description Default
key
value
View Source
    def __setattr__(self, key, value):

        super().__setattr__(key, value)

        if hasattr(self, key) and not key.startswith('_') and self._back is not None:

            self._back.reset_subscriptions()
subscribe
def subscribe(
    self,
    config_item: str,
    callback_fun: Callable[[Any], NoneType]
)

Parameters:

Name Type Description Default
config_item str
callback_fun Callable[[Any], NoneType]
View Source
    def subscribe(self, config_item: str, callback_fun: Callable[[Any], None]):

        if not hasattr(self, config_item):

            raise AttributeError(f'No config items named "{config_item}"')

        elif config_item.startswith('_'):

            raise AttributeError(f'Subscribing to private member "{config_item}" not allowed')

        else:

            self._subscriptions[config_item].append(callback_fun)

            callback_fun(getattr(self, config_item))
unsubscribe_all
def unsubscribe_all(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def unsubscribe_all(self) -> None:

        self._subscriptions = _subscribers_factory()