Skip to content

Module omnipy.compute.job_creator

Overview

View Source
from abc import ABCMeta

from datetime import datetime

from omnipy.api.protocols.private.compute.job_creator import IsJobCreator

from omnipy.api.protocols.private.engine import IsEngine

from omnipy.api.protocols.public.config import IsJobConfig

class JobCreator:

    def __init__(self) -> None:

        self._engine: IsEngine | None = None

        self._config: IsJobConfig | None = None

        self._nested_context_level: int = 0

        self._time_of_cur_toplevel_nested_context_run: datetime | None = None

    def set_engine(self, engine: IsEngine) -> None:

        self._engine = engine

    def set_config(self, config: IsJobConfig) -> None:

        self._config = config

    def __enter__(self):

        if self._nested_context_level == 0:

            self._time_of_cur_toplevel_nested_context_run = datetime.now()

        self._nested_context_level += 1

    def __exit__(self, exc_type, exc_value, traceback):

        self._nested_context_level -= 1

        if self._nested_context_level == 0:

            self._time_of_cur_toplevel_nested_context_run = None

    @property

    def engine(self) -> IsEngine | None:

        return self._engine

    @property

    def config(self) -> IsJobConfig | None:

        return self._config

    @property

    def nested_context_level(self) -> int:

        return self._nested_context_level

    @property

    def time_of_cur_toplevel_nested_context_run(self) -> datetime | None:

        return self._time_of_cur_toplevel_nested_context_run

class JobBaseMeta(ABCMeta):

    """"""

    _job_creator_obj = JobCreator()

    @property

    def job_creator(self) -> IsJobCreator:

        return self._job_creator_obj

    @property

    def nested_context_level(self) -> int:

        return self.job_creator.nested_context_level

Classes

JobBaseMeta

class JobBaseMeta(
    /,
    *args,
    **kwargs
)
View Source
class JobBaseMeta(ABCMeta):

    """"""

    _job_creator_obj = JobCreator()

    @property

    def job_creator(self) -> IsJobCreator:

        return self._job_creator_obj

    @property

    def nested_context_level(self) -> int:

        return self.job_creator.nested_context_level

Instance variables

job_creator
nested_context_level

JobCreator

class JobCreator(

)
View Source
class JobCreator:

    def __init__(self) -> None:

        self._engine: IsEngine | None = None

        self._config: IsJobConfig | None = None

        self._nested_context_level: int = 0

        self._time_of_cur_toplevel_nested_context_run: datetime | None = None

    def set_engine(self, engine: IsEngine) -> None:

        self._engine = engine

    def set_config(self, config: IsJobConfig) -> None:

        self._config = config

    def __enter__(self):

        if self._nested_context_level == 0:

            self._time_of_cur_toplevel_nested_context_run = datetime.now()

        self._nested_context_level += 1

    def __exit__(self, exc_type, exc_value, traceback):

        self._nested_context_level -= 1

        if self._nested_context_level == 0:

            self._time_of_cur_toplevel_nested_context_run = None

    @property

    def engine(self) -> IsEngine | None:

        return self._engine

    @property

    def config(self) -> IsJobConfig | None:

        return self._config

    @property

    def nested_context_level(self) -> int:

        return self._nested_context_level

    @property

    def time_of_cur_toplevel_nested_context_run(self) -> datetime | None:

        return self._time_of_cur_toplevel_nested_context_run

Instance variables

config
engine
nested_context_level
time_of_cur_toplevel_nested_context_run

Methods

enter
def __enter__(
    self
)
View Source
    def __enter__(self):

        if self._nested_context_level == 0:

            self._time_of_cur_toplevel_nested_context_run = datetime.now()

        self._nested_context_level += 1
exit
def __exit__(
    self,
    exc_type,
    exc_value,
    traceback
)

Parameters:

Name Type Description Default
exc_type
exc_value
traceback
View Source
    def __exit__(self, exc_type, exc_value, traceback):

        self._nested_context_level -= 1

        if self._nested_context_level == 0:

            self._time_of_cur_toplevel_nested_context_run = None
set_config
def set_config(
    self,
    config: omnipy.api.protocols.public.config.IsJobConfig
) -> None

Parameters:

Name Type Description Default
config IsJobConfig

Returns:

Type Description
NoneType
View Source
    def set_config(self, config: IsJobConfig) -> None:

        self._config = config
set_engine
def set_engine(
    self,
    engine: omnipy.api.protocols.private.engine.IsEngine
) -> None

Parameters:

Name Type Description Default
engine IsEngine

Returns:

Type Description
NoneType
View Source
    def set_engine(self, engine: IsEngine) -> None:

        self._engine = engine