Module omnipy.compute.flow
Overview
View Source
from typing import cast, Type
from omnipy.api.protocols.private.compute.job import (IsFuncArgJobTemplateCallable,
IsJob,
IsJobTemplate,
IsTaskTemplateArgsJobTemplateCallable)
from omnipy.api.protocols.private.engine import IsEngine
from omnipy.api.protocols.public.compute import (IsDagFlow,
IsDagFlowTemplate,
IsFuncFlow,
IsFuncFlowTemplate,
IsLinearFlow,
IsLinearFlowTemplate,
IsTaskTemplate)
from omnipy.api.protocols.public.engine import (IsDagFlowRunnerEngine,
IsFuncFlowRunnerEngine,
IsLinearFlowRunnerEngine)
from omnipy.compute.func_job import FuncArgJobBase
from omnipy.compute.job import JobMixin, JobTemplateMixin
from omnipy.compute.mixins.flow_context import FlowContextJobMixin
from omnipy.compute.tasklist_job import TaskTemplateArgsJobBase
from omnipy.util.callable_decorator import callable_decorator_cls
class FlowBase:
...
def linear_flow_template_callable_decorator_cls(
cls: Type['LinearFlowTemplate']
) -> IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate]:
return cast(IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate],
callable_decorator_cls(cls))
@linear_flow_template_callable_decorator_cls
class LinearFlowTemplate(JobTemplateMixin, FlowBase, TaskTemplateArgsJobBase):
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsLinearFlow], LinearFlow)
class LinearFlow(JobMixin, FlowBase, TaskTemplateArgsJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsLinearFlowRunnerEngine, self.engine)
self_with_mixins = cast(IsLinearFlow, self)
engine.apply_linear_flow_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsLinearFlowTemplate], LinearFlowTemplate)
def dag_flow_template_callable_decorator_cls(
cls: Type['DagFlowTemplate']
) -> IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate]:
return cast(IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate],
callable_decorator_cls(cls))
@dag_flow_template_callable_decorator_cls
class DagFlowTemplate(JobTemplateMixin, FlowBase, TaskTemplateArgsJobBase):
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsDagFlow], DagFlow)
class DagFlow(JobMixin, FlowBase, TaskTemplateArgsJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsDagFlowRunnerEngine, self.engine)
self_with_mixins = cast(IsDagFlow, self)
engine.apply_dag_flow_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsDagFlowTemplate], DagFlowTemplate)
def func_flow_template_callable_decorator_cls(
cls: Type['FuncFlowTemplate']) -> IsFuncArgJobTemplateCallable[IsFuncFlowTemplate]:
return cast(IsFuncArgJobTemplateCallable[IsFuncFlowTemplate], callable_decorator_cls(cls))
@func_flow_template_callable_decorator_cls
class FuncFlowTemplate(JobTemplateMixin, FlowBase, FuncArgJobBase):
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsFuncFlow], FuncFlow)
class FuncFlow(JobMixin, FlowBase, FuncArgJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsFuncFlowRunnerEngine, self.engine)
self_with_mixins = cast(IsFuncFlow, self)
engine.apply_func_flow_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsFuncFlowTemplate], FuncFlowTemplate)
LinearFlow.accept_mixin(FlowContextJobMixin)
DagFlow.accept_mixin(FlowContextJobMixin)
FuncFlow.accept_mixin(FlowContextJobMixin)
# TODO: Recursive replace - *args: Any -> *args: object, *kwargs: Any -> *kwargs: object
Functions
dag_flow_template_callable_decorator_cls
def dag_flow_template_callable_decorator_cls(
cls: Type[ForwardRef('DagFlowTemplate')]
) -> omnipy.api.protocols.private.compute.job.IsTaskTemplateArgsJobTemplateCallable[omnipy.api.protocols.public.compute.IsTaskTemplate, omnipy.api.protocols.public.compute.IsDagFlowTemplate]
Returns:
Type | Description |
---|---|
IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate] |
View Source
def dag_flow_template_callable_decorator_cls(
cls: Type['DagFlowTemplate']
) -> IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate]:
return cast(IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate],
callable_decorator_cls(cls))
func_flow_template_callable_decorator_cls
def func_flow_template_callable_decorator_cls(
cls: Type[ForwardRef('FuncFlowTemplate')]
) -> omnipy.api.protocols.private.compute.job.IsFuncArgJobTemplateCallable[omnipy.api.protocols.public.compute.IsFuncFlowTemplate]
Returns:
Type | Description |
---|---|
IsFuncArgJobTemplateCallable[IsFuncFlowTemplate] |
View Source
def func_flow_template_callable_decorator_cls(
cls: Type['FuncFlowTemplate']) -> IsFuncArgJobTemplateCallable[IsFuncFlowTemplate]:
return cast(IsFuncArgJobTemplateCallable[IsFuncFlowTemplate], callable_decorator_cls(cls))
linear_flow_template_callable_decorator_cls
def linear_flow_template_callable_decorator_cls(
cls: Type[ForwardRef('LinearFlowTemplate')]
) -> omnipy.api.protocols.private.compute.job.IsTaskTemplateArgsJobTemplateCallable[omnipy.api.protocols.public.compute.IsTaskTemplate, omnipy.api.protocols.public.compute.IsLinearFlowTemplate]
Returns:
Type | Description |
---|---|
IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate] |
View Source
def linear_flow_template_callable_decorator_cls(
cls: Type['LinearFlowTemplate']
) -> IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate]:
return cast(IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate],
callable_decorator_cls(cls))
Classes
DagFlow
class DagFlow(
*args,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs
)
View Source
class DagFlow(JobMixin, FlowBase, TaskTemplateArgsJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsDagFlowRunnerEngine, self.engine)
self_with_mixins = cast(IsDagFlow, self)
engine.apply_dag_flow_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsDagFlowTemplate], DagFlowTemplate)
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
create_job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJob |
View Source
@classmethod
def create_job(cls, *args: object, **kwargs: object) -> IsJob:
cls_as_job_base = cast(IsJobBase, cls)
return cls_as_job_base._create_job(*args, **kwargs)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
call
Call self as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def __call__(self, *args: object, **kwargs: object) -> object:
self_as_job_base = cast(IsJobBase, self)
try:
return self_as_job_base._call_job(*args, **kwargs)
except Exception as e:
self_as_job_base.log(str(e), level=logging.ERROR)
raise
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
Returns:
Type | Description |
---|---|
bool |
View Source
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
revise
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
def revise(self) -> IsJobTemplate:
self_as_job_base = cast(IsJobBase, self)
job_template = self_as_job_base._revise()
update_wrapper(job_template, self, updated=[])
return job_template
DagFlowTemplate
class DagFlowTemplate(
*args,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs
)
View Source
@dag_flow_template_callable_decorator_cls
class DagFlowTemplate(JobTemplateMixin, FlowBase, TaskTemplateArgsJobBase):
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsDagFlow], DagFlow)
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
create_job_template
def create_job_template(
*args: object,
**kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
@classmethod
def create_job_template(cls: Type[IsJobBase], *args: object, **kwargs: object) -> IsJobTemplate:
return cls._create_job_template(*args, **kwargs)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
call
call method at the class level which forward the call to instance-level call methods,
if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python only looks up special methods (with double underscores) at the class level, and not at the instance level. Used in the decoration process to forward call calls to the object level _obj_call() methods, if present.
See: https://stackoverflow.com/q/33824228
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
Type[+DecoratorClassT] |
View Source
def _forward_call_to_obj_if_callable(self, *args: object,
**kwargs: object) -> Type[DecoratorClassT]:
"""
__call__ method at the class level which forward the call to instance-level call methods,
if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python
only looks up special methods (with double underscores) at the class level, and not at the
instance level. Used in the decoration process to forward __call__ calls to the object level
_obj_call() methods, if present.
See: https://stackoverflow.com/q/33824228
"""
if hasattr(self, '_obj_call'):
return self._obj_call(*args, **kwargs)
if hasattr(self, '_wrapped_call'):
return self._wrapped_call(*args, **kwargs)
raise TypeError("'{}' object is not callable".format(self.__class__.__name__))
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
apply
Returns:
Type | Description |
---|---|
IsJob |
View Source
def apply(self) -> IsJob:
self_as_job_base = cast(IsJobBase, self)
job = self_as_job_base._apply()
update_wrapper(job, self, updated=[])
return job
has_coroutine_func
Returns:
Type | Description |
---|---|
bool |
View Source
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
refine
def refine(
self,
*args: Any,
update: bool = True,
**kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
Any |
||
update |
bool |
True | |
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
def refine(self, *args: Any, update: bool = True, **kwargs: object) -> IsJobTemplate:
self_as_job_base = cast(IsJobBase, self)
return self_as_job_base._refine(*args, update=update, **kwargs)
run
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def run(self, *args: object, **kwargs: object) -> object:
# TODO: Using JobTemplateMixin.run() inside flows should give error message
return self.apply()(*args, **kwargs)
FlowBase
View Source
class FlowBase:
...
FuncFlow
class FuncFlow(
*args,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs
)
View Source
class FuncFlow(JobMixin, FlowBase, FuncArgJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsFuncFlowRunnerEngine, self.engine)
self_with_mixins = cast(IsFuncFlow, self)
engine.apply_func_flow_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsFuncFlowTemplate], FuncFlowTemplate)
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
create_job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJob |
View Source
@classmethod
def create_job(cls, *args: object, **kwargs: object) -> IsJob:
cls_as_job_base = cast(IsJobBase, cls)
return cls_as_job_base._create_job(*args, **kwargs)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
call
Call self as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def __call__(self, *args: object, **kwargs: object) -> object:
self_as_job_base = cast(IsJobBase, self)
try:
return self_as_job_base._call_job(*args, **kwargs)
except Exception as e:
self_as_job_base.log(str(e), level=logging.ERROR)
raise
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
Returns:
Type | Description |
---|---|
bool |
View Source
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
revise
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
def revise(self) -> IsJobTemplate:
self_as_job_base = cast(IsJobBase, self)
job_template = self_as_job_base._revise()
update_wrapper(job_template, self, updated=[])
return job_template
FuncFlowTemplate
class FuncFlowTemplate(
*args,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs
)
View Source
@func_flow_template_callable_decorator_cls
class FuncFlowTemplate(JobTemplateMixin, FlowBase, FuncArgJobBase):
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsFuncFlow], FuncFlow)
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
create_job_template
def create_job_template(
*args: object,
**kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
@classmethod
def create_job_template(cls: Type[IsJobBase], *args: object, **kwargs: object) -> IsJobTemplate:
return cls._create_job_template(*args, **kwargs)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
call
call method at the class level which forward the call to instance-level call methods,
if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python only looks up special methods (with double underscores) at the class level, and not at the instance level. Used in the decoration process to forward call calls to the object level _obj_call() methods, if present.
See: https://stackoverflow.com/q/33824228
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
Type[+DecoratorClassT] |
View Source
def _forward_call_to_obj_if_callable(self, *args: object,
**kwargs: object) -> Type[DecoratorClassT]:
"""
__call__ method at the class level which forward the call to instance-level call methods,
if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python
only looks up special methods (with double underscores) at the class level, and not at the
instance level. Used in the decoration process to forward __call__ calls to the object level
_obj_call() methods, if present.
See: https://stackoverflow.com/q/33824228
"""
if hasattr(self, '_obj_call'):
return self._obj_call(*args, **kwargs)
if hasattr(self, '_wrapped_call'):
return self._wrapped_call(*args, **kwargs)
raise TypeError("'{}' object is not callable".format(self.__class__.__name__))
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
apply
Returns:
Type | Description |
---|---|
IsJob |
View Source
def apply(self) -> IsJob:
self_as_job_base = cast(IsJobBase, self)
job = self_as_job_base._apply()
update_wrapper(job, self, updated=[])
return job
has_coroutine_func
Returns:
Type | Description |
---|---|
bool |
View Source
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
refine
def refine(
self,
*args: Any,
update: bool = True,
**kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
Any |
||
update |
bool |
True | |
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
def refine(self, *args: Any, update: bool = True, **kwargs: object) -> IsJobTemplate:
self_as_job_base = cast(IsJobBase, self)
return self_as_job_base._refine(*args, update=update, **kwargs)
run
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def run(self, *args: object, **kwargs: object) -> object:
# TODO: Using JobTemplateMixin.run() inside flows should give error message
return self.apply()(*args, **kwargs)
LinearFlow
class LinearFlow(
*args,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs
)
View Source
class LinearFlow(JobMixin, FlowBase, TaskTemplateArgsJobBase):
def _apply_engine_decorator(self, engine: IsEngine) -> None:
if self.engine:
engine = cast(IsLinearFlowRunnerEngine, self.engine)
self_with_mixins = cast(IsLinearFlow, self)
engine.apply_linear_flow_decorator(self_with_mixins, self._accept_call_func_decorator)
@classmethod
def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:
return cast(Type[IsLinearFlowTemplate], LinearFlowTemplate)
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
create_job
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJob |
View Source
@classmethod
def create_job(cls, *args: object, **kwargs: object) -> IsJob:
cls_as_job_base = cast(IsJobBase, cls)
return cls_as_job_base._create_job(*args, **kwargs)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
call
Call self as a function.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def __call__(self, *args: object, **kwargs: object) -> object:
self_as_job_base = cast(IsJobBase, self)
try:
return self_as_job_base._call_job(*args, **kwargs)
except Exception as e:
self_as_job_base.log(str(e), level=logging.ERROR)
raise
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
Returns:
Type | Description |
---|---|
bool |
View Source
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
revise
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
def revise(self) -> IsJobTemplate:
self_as_job_base = cast(IsJobBase, self)
job_template = self_as_job_base._revise()
update_wrapper(job_template, self, updated=[])
return job_template
LinearFlowTemplate
class LinearFlowTemplate(
*args,
iterate_over_data_files: bool = False,
persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
result_key: str | None = None,
fixed_params: Optional[Mapping[str, object]] = None,
param_key_map: Optional[Mapping[str, str]] = None,
name: str | None = None,
**kwargs
)
View Source
@linear_flow_template_callable_decorator_cls
class LinearFlowTemplate(JobTemplateMixin, FlowBase, TaskTemplateArgsJobBase):
@classmethod
def _get_job_subcls_for_apply(cls) -> Type[IsJob]:
return cast(Type[IsLinearFlow], LinearFlow)
Class variables
Static methods
accept_mixin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mixin_cls |
Type |
Returns:
Type | Description |
---|---|
NoneType |
View Source
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
cls._accept_mixin(mixin_cls, update=True)
create_job_template
def create_job_template(
*args: object,
**kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
@classmethod
def create_job_template(cls: Type[IsJobBase], *args: object, **kwargs: object) -> IsJobTemplate:
return cls._create_job_template(*args, **kwargs)
reset_mixins
View Source
@classmethod
def reset_mixins(cls):
cls._mixin_classes.clear()
cls._init_params_per_mixin_cls.clear()
cls.__init__.__signature__ = cls._orig_init_signature
Instance variables
Methods
call
call method at the class level which forward the call to instance-level call methods,
if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python only looks up special methods (with double underscores) at the class level, and not at the instance level. Used in the decoration process to forward call calls to the object level _obj_call() methods, if present.
See: https://stackoverflow.com/q/33824228
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
Type[+DecoratorClassT] |
View Source
def _forward_call_to_obj_if_callable(self, *args: object,
**kwargs: object) -> Type[DecoratorClassT]:
"""
__call__ method at the class level which forward the call to instance-level call methods,
if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python
only looks up special methods (with double underscores) at the class level, and not at the
instance level. Used in the decoration process to forward __call__ calls to the object level
_obj_call() methods, if present.
See: https://stackoverflow.com/q/33824228
"""
if hasattr(self, '_obj_call'):
return self._obj_call(*args, **kwargs)
if hasattr(self, '_wrapped_call'):
return self._wrapped_call(*args, **kwargs)
raise TypeError("'{}' object is not callable".format(self.__class__.__name__))
eq
Return self==value.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
other |
object |
Returns:
Type | Description |
---|---|
bool |
View Source
def __eq__(self, other: object) -> bool:
if not isinstance(other, JobBase):
return NotImplemented
return self._get_init_args() == other._get_init_args() \
and self._get_init_kwargs() == other._get_init_kwargs()
apply
Returns:
Type | Description |
---|---|
IsJob |
View Source
def apply(self) -> IsJob:
self_as_job_base = cast(IsJobBase, self)
job = self_as_job_base._apply()
update_wrapper(job, self, updated=[])
return job
has_coroutine_func
Returns:
Type | Description |
---|---|
bool |
View Source
def has_coroutine_func(self) -> bool:
return asyncio.iscoroutinefunction(self._job_func)
log
Parameters:
Name | Type | Description | Default |
---|---|---|---|
log_msg |
str |
||
level |
int |
20 | |
datetime_obj |
datetime.datetime |
None |
View Source
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
if self._logger is not None:
create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()
_former_log_record_factory = logging.getLogRecordFactory()
if _former_log_record_factory.__name__ != '_log_record_editor':
def _log_record_editor(*args, **kwargs):
record = _former_log_record_factory(*args, **kwargs)
record.created = create_time
record.engine = f"[{record.name.split('.')[0].upper()}]"
if len(record.engine) < 9:
record.engine += ' '
return record
logging.setLogRecordFactory(_log_record_editor)
self._logger.log(level, log_msg)
refine
def refine(
self,
*args: Any,
update: bool = True,
**kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
Any |
||
update |
bool |
True | |
kwargs |
object |
Returns:
Type | Description |
---|---|
IsJobTemplate |
View Source
def refine(self, *args: Any, update: bool = True, **kwargs: object) -> IsJobTemplate:
self_as_job_base = cast(IsJobBase, self)
return self_as_job_base._refine(*args, update=update, **kwargs)
run
Parameters:
Name | Type | Description | Default |
---|---|---|---|
args |
object |
||
kwargs |
object |
Returns:
Type | Description |
---|---|
object |
View Source
def run(self, *args: object, **kwargs: object) -> object:
# TODO: Using JobTemplateMixin.run() inside flows should give error message
return self.apply()(*args, **kwargs)