Module omnipy.compute.task
Overview
View Source
from typing import cast, Type
from omnipy.api.protocols.private.compute.job import (IsFuncArgJobTemplateCallable,
IsJob,
IsJobTemplate)
from omnipy.api.protocols.private.engine import IsEngine
from omnipy.api.protocols.public.compute import IsTask, IsTaskTemplate
from omnipy.api.protocols.public.engine import IsTaskRunnerEngine
from omnipy.compute.func_job import FuncArgJobBase
from omnipy.compute.job import JobMixin, JobTemplateMixin
from omnipy.util.callable_decorator import callable_decorator_cls
def task_template_callable_decorator_cls(
cls: 'Type[TaskTemplate]') -> IsFuncArgJobTemplateCallable[IsTaskTemplate]:
return cast(IsFuncArgJobTemplateCallable[IsTaskTemplate], callable_decorator_cls(cls))
class TaskBase:
# TODO: Can this and FlowBase be replaced with IsTask/IsFlow, or similar?
...
@task_template_callable_decorator_cls
class TaskTemplate(JobTemplateMixin, TaskBase, FuncArgJobBase):
""""""
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsTask], Task)
class Task(JobMixin, TaskBase, FuncArgJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsTaskRunnerEngine, self.engine)
self_with_mixins = cast(IsTask, self)
engine.apply_task_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsTaskTemplate], TaskTemplate)
# TODO: Would we need the possibility to refine task templates by adding new task parameters?
Functions
task_template_callable_decorator_cls
def task_template_callable_decorator_cls(
cls: 'Type[TaskTemplate]'
) -> omnipy.api.protocols.private.compute.job.IsFuncArgJobTemplateCallable[omnipy.api.protocols.public.compute.IsTaskTemplate]
Returns:
Type | Description |
---|---|
IsFuncArgJobTemplateCallable[IsTaskTemplate] |
View Source
def task_template_callable_decorator_cls(
cls: 'Type[TaskTemplate]') -> IsFuncArgJobTemplateCallable[IsTaskTemplate]:
return cast(IsFuncArgJobTemplateCallable[IsTaskTemplate], callable_decorator_cls(cls))
Classes
Task
class Task(
*args,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs
)
View Source
class Task(JobMixin, TaskBase, FuncArgJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsTaskRunnerEngine, self.engine)
self_with_mixins = cast(IsTask, self)
engine.apply_task_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsTaskTemplate], TaskTemplate)
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
create_job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJob |
View Source
@classmethod
def create_job(cls, *args: object, **kwargs: object) -> IsJob:
cls_as_job_base = cast(IsJobBase, cls)
return cls_as_job_base._create_job(*args, **kwargs)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
call
Call self as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def __call__(self, *args: object, **kwargs: object) -> object:
self_as_job_base = cast(IsJobBase, self)
try:
return self_as_job_base._call_job(*args, **kwargs)
except Exception as e:
self_as_job_base.log(str(e), level=logging.ERROR)
raise
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
Returns:
Type | Description |
---|---|
bool |
View Source
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
revise
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
def revise(self) -> IsJobTemplate:
self_as_job_base = cast(IsJobBase, self)
job_template = self_as_job_base._revise()
update_wrapper(job_template, self, updated=[])
return job_template
TaskBase
View Source
class TaskBase:
# TODO: Can this and FlowBase be replaced with IsTask/IsFlow, or similar?
...
TaskTemplate
class TaskTemplate(
*args,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs
)
View Source
@task_template_callable_decorator_cls
class TaskTemplate(JobTemplateMixin, TaskBase, FuncArgJobBase):
""""""
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsTask], Task)
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
create_job_template
def create_job_template(
*args: object,
**kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
@classmethod
def create_job_template(cls: Type[IsJobBase], *args: object, **kwargs: object) -> IsJobTemplate:
return cls._create_job_template(*args, **kwargs)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
call
call method at the class level which forward the call to instance-level call methods,
if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python only looks up special methods (with double underscores) at the class level, and not at the instance level. Used in the decoration process to forward call calls to the object level _obj_call() methods, if present.
See: https://stackoverflow.com/q/33824228
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
Type[+DecoratorClassT] |
View Source
def _forward_call_to_obj_if_callable(self, *args: object,
**kwargs: object) -> Type[DecoratorClassT]:
"""
__call__ method at the class level which forward the call to instance-level call methods,
if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python
only looks up special methods (with double underscores) at the class level, and not at the
instance level. Used in the decoration process to forward __call__ calls to the object level
_obj_call() methods, if present.
See: https://stackoverflow.com/q/33824228
"""
if hasattr(self, '_obj_call'):
return self._obj_call(*args, **kwargs)
if hasattr(self, '_wrapped_call'):
return self._wrapped_call(*args, **kwargs)
raise TypeError("'{}' object is not callable".format(self.__class__.__name__))
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
apply
Returns:
Type | Description |
---|---|
IsJob |
View Source
def apply(self) -> IsJob:
self_as_job_base = cast(IsJobBase, self)
job = self_as_job_base._apply()
update_wrapper(job, self, updated=[])
return job
has_coroutine_func
Returns:
Type | Description |
---|---|
bool |
View Source
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
refine
def refine(
self,
*args: Any,
update: bool = True,
**kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
Any |
||
update |
bool |
True | |
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
def refine(self, *args: Any, update: bool = True, **kwargs: object) -> IsJobTemplate:
self_as_job_base = cast(IsJobBase, self)
return self_as_job_base._refine(*args, update=update, **kwargs)
run
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def run(self, *args: object, **kwargs: object) -> object:
# TODO: Using JobTemplateMixin.run() inside flows should give error message
return self.apply()(*args, **kwargs)