Module omnipy.log.registry
Overview
View Source
from collections import defaultdict
from datetime import datetime
from typing import DefaultDict
from omnipy.api.enums import RunState, RunStateLogMessages
from omnipy.api.protocols.private.compute.mixins import IsUniquelyNamedJob
from omnipy.log.mixin import LogMixin
class RunStateRegistry(LogMixin):
def __init__(self) -> None:
self._jobs: dict[str, IsUniquelyNamedJob] = {}
self._job_states: dict[str, RunState] = {}
self._state_jobs: DefaultDict[RunState, list[str]] = defaultdict(list)
self._job_state_datetime: dict[tuple[str, RunState], datetime] = {}
super().__init__()
def get_job_state(self, job: IsUniquelyNamedJob) -> RunState:
return self._job_states[job.unique_name]
def get_job_state_datetime(self, job: IsUniquelyNamedJob, state: RunState) -> datetime:
return self._job_state_datetime[(job.unique_name, state)]
def all_jobs(self, state: RunState | None = None) -> tuple[IsUniquelyNamedJob, ...]:
if state is not None:
job_unique_names = self._state_jobs[state]
return tuple(self._jobs[unique_name] for unique_name in job_unique_names)
else:
return tuple(self._jobs.values())
def set_job_state(self, job: IsUniquelyNamedJob, state: RunState) -> None:
cur_datetime = datetime.now()
if job.unique_name in self._jobs:
self._update_job_registration(job, state)
else:
self._register_new_job(job, state)
self._update_job_stats(job, state, cur_datetime)
self._log_state_change(job, state)
def _other_job_registered_with_same_unique_name(self, job: IsUniquelyNamedJob) -> bool:
other_job_same_unique_name = self._jobs.get(job.unique_name)
return bool(other_job_same_unique_name) and id(other_job_same_unique_name) != id(job)
def _update_job_registration(self, job: IsUniquelyNamedJob, state: RunState) -> None:
# TODO: Reimplement logic using a state machine, e.g. "transitions" package
if self._other_job_registered_with_same_unique_name(job):
while self._other_job_registered_with_same_unique_name(job):
job.regenerate_unique_name()
self._register_new_job(job, state)
else:
prev_state = self._job_states[job.unique_name]
if state == prev_state + 1:
self._state_jobs[prev_state].remove(job.unique_name)
else:
self._raise_job_error(
job,
f'Transitioning from state {prev_state.name} '
f'to state {state.name} is not allowed',
)
def _register_new_job(self, job, state) -> None:
if state != RunState.INITIALIZED:
self._raise_job_error(
job,
f'Initial state of must be "INITIALIZED", not "{state.name}"',
)
self._jobs[job.unique_name] = job
def _update_job_stats(self, job, state, cur_datetime) -> None:
self._job_states[job.unique_name] = state
self._state_jobs[state].append(job.unique_name)
self._job_state_datetime[(job.unique_name, state)] = cur_datetime
def _log_state_change(self, job: IsUniquelyNamedJob, state: RunState) -> None:
log_msg = RunStateLogMessages[state.name].value.format(job.unique_name)
datetime_obj = self.get_job_state_datetime(job, state)
self.log(log_msg, datetime_obj=datetime_obj)
def _raise_job_error(self, job: IsUniquelyNamedJob, msg: str) -> None:
raise ValueError(f'Error in job "{job.unique_name}": {msg}')
Classes
RunStateRegistry
View Source
class RunStateRegistry(LogMixin):
def __init__(self) -> None:
self._jobs: dict[str, IsUniquelyNamedJob] = {}
self._job_states: dict[str, RunState] = {}
self._state_jobs: DefaultDict[RunState, list[str]] = defaultdict(list)
self._job_state_datetime: dict[tuple[str, RunState], datetime] = {}
super().__init__()
def get_job_state(self, job: IsUniquelyNamedJob) -> RunState:
return self._job_states[job.unique_name]
def get_job_state_datetime(self, job: IsUniquelyNamedJob, state: RunState) -> datetime:
return self._job_state_datetime[(job.unique_name, state)]
def all_jobs(self, state: RunState | None = None) -> tuple[IsUniquelyNamedJob, ...]:
if state is not None:
job_unique_names = self._state_jobs[state]
return tuple(self._jobs[unique_name] for unique_name in job_unique_names)
else:
return tuple(self._jobs.values())
def set_job_state(self, job: IsUniquelyNamedJob, state: RunState) -> None:
cur_datetime = datetime.now()
if job.unique_name in self._jobs:
self._update_job_registration(job, state)
else:
self._register_new_job(job, state)
self._update_job_stats(job, state, cur_datetime)
self._log_state_change(job, state)
def _other_job_registered_with_same_unique_name(self, job: IsUniquelyNamedJob) -> bool:
other_job_same_unique_name = self._jobs.get(job.unique_name)
return bool(other_job_same_unique_name) and id(other_job_same_unique_name) != id(job)
def _update_job_registration(self, job: IsUniquelyNamedJob, state: RunState) -> None:
# TODO: Reimplement logic using a state machine, e.g. "transitions" package
if self._other_job_registered_with_same_unique_name(job):
while self._other_job_registered_with_same_unique_name(job):
job.regenerate_unique_name()
self._register_new_job(job, state)
else:
prev_state = self._job_states[job.unique_name]
if state == prev_state + 1:
self._state_jobs[prev_state].remove(job.unique_name)
else:
self._raise_job_error(
job,
f'Transitioning from state {prev_state.name} '
f'to state {state.name} is not allowed',
)
def _register_new_job(self, job, state) -> None:
if state != RunState.INITIALIZED:
self._raise_job_error(
job,
f'Initial state of must be "INITIALIZED", not "{state.name}"',
)
self._jobs[job.unique_name] = job
def _update_job_stats(self, job, state, cur_datetime) -> None:
self._job_states[job.unique_name] = state
self._state_jobs[state].append(job.unique_name)
self._job_state_datetime[(job.unique_name, state)] = cur_datetime
def _log_state_change(self, job: IsUniquelyNamedJob, state: RunState) -> None:
log_msg = RunStateLogMessages[state.name].value.format(job.unique_name)
datetime_obj = self.get_job_state_datetime(job, state)
self.log(log_msg, datetime_obj=datetime_obj)
def _raise_job_error(self, job: IsUniquelyNamedJob, msg: str) -> None:
raise ValueError(f'Error in job "{job.unique_name}": {msg}')
Instance variables
Methods
all_jobs
def all_jobs(
self,
state: omnipy.api.enums.RunState | None = None
) -> tuple[omnipy.api.protocols.private.compute.mixins.IsUniquelyNamedJob, ...]
Parameters:
Name | Type | Description | Default |
---|---|---|---|
state |
omnipy.api.enums.RunState |
None |
Returns:
Type | Description |
---|---|
tuple[IsUniquelyNamedJob, ...] |
View Source
def all_jobs(self, state: RunState | None = None) -> tuple[IsUniquelyNamedJob, ...]:
if state is not None:
job_unique_names = self._state_jobs[state]
return tuple(self._jobs[unique_name] for unique_name in job_unique_names)
else:
return tuple(self._jobs.values())
get_job_state
def get_job_state(
self,
job: omnipy.api.protocols.private.compute.mixins.IsUniquelyNamedJob
) -> omnipy.api.enums.RunState
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job |
IsUniquelyNamedJob |
Returns:
Type | Description |
---|---|
RunState |
View Source
def get_job_state(self, job: IsUniquelyNamedJob) -> RunState:
return self._job_states[job.unique_name]
get_job_state_datetime
def get_job_state_datetime(
self,
job: omnipy.api.protocols.private.compute.mixins.IsUniquelyNamedJob,
state: omnipy.api.enums.RunState
) -> datetime.datetime
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job |
IsUniquelyNamedJob |
||
state |
RunState |
Returns:
Type | Description |
---|---|
datetime.datetime |
View Source
def get_job_state_datetime(self, job: IsUniquelyNamedJob, state: RunState) -> datetime:
return self._job_state_datetime[(job.unique_name, state)]
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
set_job_state
def set_job_state(
self,
job: omnipy.api.protocols.private.compute.mixins.IsUniquelyNamedJob,
state: omnipy.api.enums.RunState
) -> None
Parameters:
Name | Type | Description | Default |
---|---|---|---|
job |
IsUniquelyNamedJob |
||
state |
RunState |
Returns:
Type | Description |
---|---|
NoneType |
View Source
def set_job_state(self, job: IsUniquelyNamedJob, state: RunState) -> None:
cur_datetime = datetime.now()
if job.unique_name in self._jobs:
self._update_job_registration(job, state)
else:
self._register_new_job(job, state)
self._update_job_stats(job, state, cur_datetime)
self._log_state_change(job, state)