Skip to content

Module omnipy.log.registry

Overview

View Source
from collections import defaultdict

from datetime import datetime

from typing import DefaultDict

from omnipy.api.enums import RunState, RunStateLogMessages

from omnipy.api.protocols.private.compute.mixins import IsUniquelyNamedJob

from omnipy.log.mixin import LogMixin

class RunStateRegistry(LogMixin):

    def __init__(self) -> None:

        self._jobs: dict[str, IsUniquelyNamedJob] = {}

        self._job_states: dict[str, RunState] = {}

        self._state_jobs: DefaultDict[RunState, list[str]] = defaultdict(list)

        self._job_state_datetime: dict[tuple[str, RunState], datetime] = {}

        super().__init__()

    def get_job_state(self, job: IsUniquelyNamedJob) -> RunState:

        return self._job_states[job.unique_name]

    def get_job_state_datetime(self, job: IsUniquelyNamedJob, state: RunState) -> datetime:

        return self._job_state_datetime[(job.unique_name, state)]

    def all_jobs(self, state: RunState | None = None) -> tuple[IsUniquelyNamedJob, ...]:

        if state is not None:

            job_unique_names = self._state_jobs[state]

            return tuple(self._jobs[unique_name] for unique_name in job_unique_names)

        else:

            return tuple(self._jobs.values())

    def set_job_state(self, job: IsUniquelyNamedJob, state: RunState) -> None:

        cur_datetime = datetime.now()

        if job.unique_name in self._jobs:

            self._update_job_registration(job, state)

        else:

            self._register_new_job(job, state)

        self._update_job_stats(job, state, cur_datetime)

        self._log_state_change(job, state)

    def _other_job_registered_with_same_unique_name(self, job: IsUniquelyNamedJob) -> bool:

        other_job_same_unique_name = self._jobs.get(job.unique_name)

        return bool(other_job_same_unique_name) and id(other_job_same_unique_name) != id(job)

    def _update_job_registration(self, job: IsUniquelyNamedJob, state: RunState) -> None:

        # TODO: Reimplement logic using a state machine, e.g. "transitions" package

        if self._other_job_registered_with_same_unique_name(job):

            while self._other_job_registered_with_same_unique_name(job):

                job.regenerate_unique_name()

            self._register_new_job(job, state)

        else:

            prev_state = self._job_states[job.unique_name]

            if state == prev_state + 1:

                self._state_jobs[prev_state].remove(job.unique_name)

            else:

                self._raise_job_error(

                    job,

                    f'Transitioning from state {prev_state.name} '

                    f'to state {state.name} is not allowed',

                )

    def _register_new_job(self, job, state) -> None:

        if state != RunState.INITIALIZED:

            self._raise_job_error(

                job,

                f'Initial state of must be "INITIALIZED", not "{state.name}"',

            )

        self._jobs[job.unique_name] = job

    def _update_job_stats(self, job, state, cur_datetime) -> None:

        self._job_states[job.unique_name] = state

        self._state_jobs[state].append(job.unique_name)

        self._job_state_datetime[(job.unique_name, state)] = cur_datetime

    def _log_state_change(self, job: IsUniquelyNamedJob, state: RunState) -> None:

        log_msg = RunStateLogMessages[state.name].value.format(job.unique_name)

        datetime_obj = self.get_job_state_datetime(job, state)

        self.log(log_msg, datetime_obj=datetime_obj)

    def _raise_job_error(self, job: IsUniquelyNamedJob, msg: str) -> None:

        raise ValueError(f'Error in job "{job.unique_name}": {msg}')

Classes

RunStateRegistry

class RunStateRegistry(

)
View Source
class RunStateRegistry(LogMixin):

    def __init__(self) -> None:

        self._jobs: dict[str, IsUniquelyNamedJob] = {}

        self._job_states: dict[str, RunState] = {}

        self._state_jobs: DefaultDict[RunState, list[str]] = defaultdict(list)

        self._job_state_datetime: dict[tuple[str, RunState], datetime] = {}

        super().__init__()

    def get_job_state(self, job: IsUniquelyNamedJob) -> RunState:

        return self._job_states[job.unique_name]

    def get_job_state_datetime(self, job: IsUniquelyNamedJob, state: RunState) -> datetime:

        return self._job_state_datetime[(job.unique_name, state)]

    def all_jobs(self, state: RunState | None = None) -> tuple[IsUniquelyNamedJob, ...]:

        if state is not None:

            job_unique_names = self._state_jobs[state]

            return tuple(self._jobs[unique_name] for unique_name in job_unique_names)

        else:

            return tuple(self._jobs.values())

    def set_job_state(self, job: IsUniquelyNamedJob, state: RunState) -> None:

        cur_datetime = datetime.now()

        if job.unique_name in self._jobs:

            self._update_job_registration(job, state)

        else:

            self._register_new_job(job, state)

        self._update_job_stats(job, state, cur_datetime)

        self._log_state_change(job, state)

    def _other_job_registered_with_same_unique_name(self, job: IsUniquelyNamedJob) -> bool:

        other_job_same_unique_name = self._jobs.get(job.unique_name)

        return bool(other_job_same_unique_name) and id(other_job_same_unique_name) != id(job)

    def _update_job_registration(self, job: IsUniquelyNamedJob, state: RunState) -> None:

        # TODO: Reimplement logic using a state machine, e.g. "transitions" package

        if self._other_job_registered_with_same_unique_name(job):

            while self._other_job_registered_with_same_unique_name(job):

                job.regenerate_unique_name()

            self._register_new_job(job, state)

        else:

            prev_state = self._job_states[job.unique_name]

            if state == prev_state + 1:

                self._state_jobs[prev_state].remove(job.unique_name)

            else:

                self._raise_job_error(

                    job,

                    f'Transitioning from state {prev_state.name} '

                    f'to state {state.name} is not allowed',

                )

    def _register_new_job(self, job, state) -> None:

        if state != RunState.INITIALIZED:

            self._raise_job_error(

                job,

                f'Initial state of must be "INITIALIZED", not "{state.name}"',

            )

        self._jobs[job.unique_name] = job

    def _update_job_stats(self, job, state, cur_datetime) -> None:

        self._job_states[job.unique_name] = state

        self._state_jobs[state].append(job.unique_name)

        self._job_state_datetime[(job.unique_name, state)] = cur_datetime

    def _log_state_change(self, job: IsUniquelyNamedJob, state: RunState) -> None:

        log_msg = RunStateLogMessages[state.name].value.format(job.unique_name)

        datetime_obj = self.get_job_state_datetime(job, state)

        self.log(log_msg, datetime_obj=datetime_obj)

    def _raise_job_error(self, job: IsUniquelyNamedJob, msg: str) -> None:

        raise ValueError(f'Error in job "{job.unique_name}": {msg}')

Instance variables

logger

Methods

all_jobs
def all_jobs(
    self,
    state: omnipy.api.enums.RunState | None = None
) -> tuple[omnipy.api.protocols.private.compute.mixins.IsUniquelyNamedJob, ...]

Parameters:

Name Type Description Default
state omnipy.api.enums.RunState None

Returns:

Type Description
tuple[IsUniquelyNamedJob, ...]
View Source
    def all_jobs(self, state: RunState | None = None) -> tuple[IsUniquelyNamedJob, ...]:

        if state is not None:

            job_unique_names = self._state_jobs[state]

            return tuple(self._jobs[unique_name] for unique_name in job_unique_names)

        else:

            return tuple(self._jobs.values())
get_job_state
def get_job_state(
    self,
    job: omnipy.api.protocols.private.compute.mixins.IsUniquelyNamedJob
) -> omnipy.api.enums.RunState

Parameters:

Name Type Description Default
job IsUniquelyNamedJob

Returns:

Type Description
RunState
View Source
    def get_job_state(self, job: IsUniquelyNamedJob) -> RunState:

        return self._job_states[job.unique_name]
get_job_state_datetime
def get_job_state_datetime(
    self,
    job: omnipy.api.protocols.private.compute.mixins.IsUniquelyNamedJob,
    state: omnipy.api.enums.RunState
) -> datetime.datetime

Parameters:

Name Type Description Default
job IsUniquelyNamedJob
state RunState

Returns:

Type Description
datetime.datetime
View Source
    def get_job_state_datetime(self, job: IsUniquelyNamedJob, state: RunState) -> datetime:

        return self._job_state_datetime[(job.unique_name, state)]
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        if self._logger is not None:

            create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()

            _former_log_record_factory = logging.getLogRecordFactory()

            if _former_log_record_factory.__name__ != '_log_record_editor':

                def _log_record_editor(*args, **kwargs):

                    record = _former_log_record_factory(*args, **kwargs)

                    record.created = create_time

                    record.engine = f"[{record.name.split('.')[0].upper()}]"

                    if len(record.engine) < 9:

                        record.engine += ' '

                    return record

                logging.setLogRecordFactory(_log_record_editor)

            self._logger.log(level, log_msg)
set_job_state
def set_job_state(
    self,
    job: omnipy.api.protocols.private.compute.mixins.IsUniquelyNamedJob,
    state: omnipy.api.enums.RunState
) -> None

Parameters:

Name Type Description Default
job IsUniquelyNamedJob
state RunState

Returns:

Type Description
NoneType
View Source
    def set_job_state(self, job: IsUniquelyNamedJob, state: RunState) -> None:

        cur_datetime = datetime.now()

        if job.unique_name in self._jobs:

            self._update_job_registration(job, state)

        else:

            self._register_new_job(job, state)

        self._update_job_stats(job, state, cur_datetime)

        self._log_state_change(job, state)