Module omnipy.compute.func_job
Overview
View Source
import asyncio
from typing import Callable
from omnipy.api.typedefs import GeneralDecorator
from omnipy.compute.job import JobBase
from omnipy.compute.mixins.func_signature import SignatureFuncJobBaseMixin
from omnipy.compute.mixins.iterate import IterateFuncJobBaseMixin
from omnipy.compute.mixins.params import ParamsFuncJobBaseMixin
from omnipy.compute.mixins.result_key import ResultKeyFuncJobBaseMixin
from omnipy.compute.mixins.serialize import SerializerFuncJobBaseMixin
class PlainFuncArgJobBase(JobBase):
def __init__(self, job_func: Callable, *args: object, **kwargs: object) -> None:
self._job_func = job_func
def _get_init_args(self) -> tuple[object, ...]:
return self._job_func,
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
def _call_job(self, *args: object, **kwargs: object) -> object:
"""To be overloaded by mixins"""
return self._call_func(*args, **kwargs)
def _call_func(self, *args: object, **kwargs: object) -> object:
"""
To be decorated by job runners and mixins that need early application. Should not
be overloaded using inheritance. The method _accept_call_func_decorator accepts
decorators.
"""
return self._job_func(*args, **kwargs)
def _accept_call_func_decorator(self, call_func_decorator: GeneralDecorator) -> None:
self._call_func = call_func_decorator(self._call_func) # type:ignore
# Extra level needed for mixins to be able to overload _call_job (and possibly other methods)
class FuncArgJobBase(PlainFuncArgJobBase):
...
FuncArgJobBase.accept_mixin(SignatureFuncJobBaseMixin)
FuncArgJobBase.accept_mixin(IterateFuncJobBaseMixin)
FuncArgJobBase.accept_mixin(SerializerFuncJobBaseMixin)
FuncArgJobBase.accept_mixin(ResultKeyFuncJobBaseMixin)
FuncArgJobBase.accept_mixin(ParamsFuncJobBaseMixin)
Classes
FuncArgJobBase
class FuncArgJobBase(
job_func: Callable,
*args: object,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs: object
)
View Source
class FuncArgJobBase(PlainFuncArgJobBase):
...
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
Returns:
Type | Description |
---|---|
bool |
View Source
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
PlainFuncArgJobBase
class PlainFuncArgJobBase(
job_func: Callable,
*args: object,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs: object
)
View Source
class PlainFuncArgJobBase(JobBase):
def __init__(self, job_func: Callable, *args: object, **kwargs: object) -> None:
self._job_func = job_func
def _get_init_args(self) -> tuple[object, ...]:
return self._job_func,
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
def _call_job(self, *args: object, **kwargs: object) -> object:
"""To be overloaded by mixins"""
return self._call_func(*args, **kwargs)
def _call_func(self, *args: object, **kwargs: object) -> object:
"""
To be decorated by job runners and mixins that need early application. Should not
be overloaded using inheritance. The method _accept_call_func_decorator accepts
decorators.
"""
return self._job_func(*args, **kwargs)
def _accept_call_func_decorator(self, call_func_decorator: GeneralDecorator) -> None:
self._call_func = call_func_decorator(self._call_func) # type:ignore
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
Returns:
Type | Description |
---|---|
bool |
View Source
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)