Module omnipy.compute.job
Overview
View Source
from typing import cast, Type
from omnipy.api.protocols.private.compute.job import (IsFuncArgJobTemplateCallable,
IsJob,
IsJobTemplate,
IsTaskTemplateArgsJobTemplateCallable)
from omnipy.api.protocols.private.engine import IsEngine
from omnipy.api.protocols.public.compute import (IsDagFlow,
IsDagFlowTemplate,
IsFuncFlow,
IsFuncFlowTemplate,
IsLinearFlow,
IsLinearFlowTemplate,
IsTaskTemplate)
from omnipy.api.protocols.public.engine import (IsDagFlowRunnerEngine,
IsFuncFlowRunnerEngine,
IsLinearFlowRunnerEngine)
from omnipy.compute.func_job import FuncArgJobBase
from omnipy.compute.job import JobMixin, JobTemplateMixin
from omnipy.compute.mixins.flow_context import FlowContextJobMixin
from omnipy.compute.tasklist_job import TaskTemplateArgsJobBase
from omnipy.util.callable_decorator import callable_decorator_cls
class FlowBase:
...
def linear_flow_template_callable_decorator_cls(
cls: Type['LinearFlowTemplate']
) -> IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate]:
return cast(IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate],
callable_decorator_cls(cls))
@linear_flow_template_callable_decorator_cls
class LinearFlowTemplate(JobTemplateMixin, FlowBase, TaskTemplateArgsJobBase):
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsLinearFlow], LinearFlow)
class LinearFlow(JobMixin, FlowBase, TaskTemplateArgsJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsLinearFlowRunnerEngine, self.engine)
self_with_mixins = cast(IsLinearFlow, self)
engine.apply_linear_flow_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsLinearFlowTemplate], LinearFlowTemplate)
def dag_flow_template_callable_decorator_cls(
cls: Type['DagFlowTemplate']
) -> IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate]:
return cast(IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate],
callable_decorator_cls(cls))
@dag_flow_template_callable_decorator_cls
class DagFlowTemplate(JobTemplateMixin, FlowBase, TaskTemplateArgsJobBase):
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsDagFlow], DagFlow)
class DagFlow(JobMixin, FlowBase, TaskTemplateArgsJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsDagFlowRunnerEngine, self.engine)
self_with_mixins = cast(IsDagFlow, self)
engine.apply_dag_flow_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsDagFlowTemplate], DagFlowTemplate)
def func_flow_template_callable_decorator_cls(
cls: Type['FuncFlowTemplate']) -> IsFuncArgJobTemplateCallable[IsFuncFlowTemplate]:
return cast(IsFuncArgJobTemplateCallable[IsFuncFlowTemplate], callable_decorator_cls(cls))
@func_flow_template_callable_decorator_cls
class FuncFlowTemplate(JobTemplateMixin, FlowBase, FuncArgJobBase):
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsFuncFlow], FuncFlow)
class FuncFlow(JobMixin, FlowBase, FuncArgJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsFuncFlowRunnerEngine, self.engine)
self_with_mixins = cast(IsFuncFlow, self)
engine.apply_func_flow_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsFuncFlowTemplate], FuncFlowTemplate)
LinearFlow.accept_mixin(FlowContextJobMixin)
DagFlow.accept_mixin(FlowContextJobMixin)
FuncFlow.accept_mixin(FlowContextJobMixin)
# TODO: Recursive replace - *args: Any -> *args: object, *kwargs: Any -> *kwargs: object
Classes
JobBase
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
JobMixin
class JobMixin(
*args,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs
)
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
create_job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJob |
View Source
@classmethod
def create_job(cls, *args: object, **kwargs: object) -> IsJob:
cls_as_job_base = cast(IsJobBase, cls)
return cls_as_job_base._create_job(*args, **kwargs)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
call
Call self as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def __call__(self, *args: object, **kwargs: object) -> object:
self_as_job_base = cast(IsJobBase, self)
try:
return self_as_job_base._call_job(*args, **kwargs)
except Exception as e:
self_as_job_base.log(str(e), level=logging.ERROR)
raise
revise
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
def revise(self) -> IsJobTemplate:
self_as_job_base = cast(IsJobBase, self)
job_template = self_as_job_base._revise()
update_wrapper(job_template, self, updated=[])
return job_template
JobTemplateMixin
class JobTemplateMixin(
*args,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs
)
Static methods
create_job_template
def create_job_template(
*args: object,
**kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
@classmethod
def create_job_template(cls: Type[IsJobBase], *args: object, **kwargs: object) -> IsJobTemplate:
return cls._create_job_template(*args, **kwargs)
Methods
call
Call self as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def __call__(self, *args: object, **kwargs: object) -> object:
self_as_job_base = cast(IsJobBase, self)
return self_as_job_base._call_job_template(*args, **kwargs)
apply
Returns:
Type | Description |
---|---|
IsJob |
View Source
def apply(self) -> IsJob:
self_as_job_base = cast(IsJobBase, self)
job = self_as_job_base._apply()
update_wrapper(job, self, updated=[])
return job
refine
def refine(
self,
*args: Any,
update: bool = True,
**kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
Any |
||
update |
bool |
True | |
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
def refine(self, *args: Any, update: bool = True, **kwargs: object) -> IsJobTemplate:
self_as_job_base = cast(IsJobBase, self)
return self_as_job_base._refine(*args, update=update, **kwargs)
run
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def run(self, *args: object, **kwargs: object) -> object:
# TODO: Using JobTemplateMixin.run() inside flows should give error message
return self.apply()(*args, **kwargs)