Skip to content

omnipy.compute.flow

CLASS DESCRIPTION
DagFlow
DagFlowTemplateCore
FlowBase
FuncFlow
FuncFlowTemplateCore
LinearFlow
LinearFlowTemplateCore
ATTRIBUTE DESCRIPTION
DagFlowTemplate

FuncFlowTemplate

LinearFlowTemplate

DagFlow

Bases: JobMixin[IsDagFlowTemplate[_CallP, _RetT], IsDagFlow[_CallP, _RetT], _CallP, _RetT], FlowBase, TaskTemplateArgsJobBase[IsDagFlowTemplate[_CallP, _RetT], IsDagFlow[_CallP, _RetT], _CallP, _RetT], Generic[_CallP, _RetT]

METHOD DESCRIPTION
__init__
accept_mixin
create_job
has_coroutine_func
log
reset_mixins
revise
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobConfig

engine

TYPE: IsEngine | None

in_flow_context

TYPE: bool

logger

TYPE: Logger

task_templates

TYPE: tuple[IsTaskTemplate, ...]

time_of_cur_toplevel_flow_run

TYPE: datetime | None

Source code in src/omnipy/compute/flow.py
class DagFlow(JobMixin[IsDagFlowTemplate[_CallP, _RetT], IsDagFlow[_CallP, _RetT], _CallP, _RetT],
              FlowBase,
              TaskTemplateArgsJobBase[IsDagFlowTemplate[_CallP, _RetT],
                                      IsDagFlow[_CallP, _RetT],
                                      _CallP,
                                      _RetT],
              Generic[_CallP, _RetT]):
    def _apply_engine_decorator(self, engine: IsEngine) -> None:
        if self.engine:
            engine = cast(IsDagFlowRunnerEngine, self.engine)
            self_with_mixins = cast(IsDagFlow, self)
            engine.apply_dag_flow_decorator(self_with_mixins, self._accept_call_func_decorator)

    @classmethod
    def _get_job_template_subcls_for_revise(cls) -> type[IsDagFlowTemplate[_CallP, _RetT]]:
        return cast(type[IsDagFlowTemplate[_CallP, _RetT]], DagFlowTemplate)

config property

config: IsJobConfig

engine property

engine: IsEngine | None

in_flow_context property

in_flow_context: bool

logger property

logger: Logger

task_templates property

task_templates: tuple[IsTaskTemplate, ...]

time_of_cur_toplevel_flow_run property

time_of_cur_toplevel_flow_run: datetime | None

__init__

__init__(*args, **kwargs)
Source code in src/omnipy/compute/_job.py
def __init__(self, *args, **kwargs):
    if JobBase not in self.__class__.__mro__:
        raise TypeError('JobMixin is not meant to be instantiated outside the context '
                        'of a JobBase subclass.')

accept_mixin classmethod

accept_mixin(mixin_cls: Type) -> None
Source code in src/omnipy/util/mixin.py
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
    cls._accept_mixin(mixin_cls, update=True)

create_job classmethod

create_job(*args: object, **kwargs: object) -> _JobT
Source code in src/omnipy/compute/_job.py
@classmethod
def create_job(cls, *args: object, **kwargs: object) -> _JobT:
    cls_as_job_base = cast(IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT], cls)
    return cls_as_job_base._create_job(*args, **kwargs)

has_coroutine_func

has_coroutine_func() -> bool
Source code in src/omnipy/compute/_func_job.py
def has_coroutine_func(self) -> bool:
    return asyncio.iscoroutinefunction(self._job_func)

log

log(log_msg: str, level: int = INFO, datetime_obj: datetime | None = None)
Source code in src/omnipy/hub/log/mixin.py
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
    if self._logger is not None:
        create_time = datetime_obj.timestamp() if datetime_obj else time.time()
        self._logger.log(level, log_msg, extra=dict(timestamp=create_time))

reset_mixins classmethod

reset_mixins()
Source code in src/omnipy/util/mixin.py
@classmethod
def reset_mixins(cls):
    cls._mixin_classes.clear()
    cls._init_params_per_mixin_cls.clear()
    cls.__init__.__signature__ = cls._orig_init_signature

revise

revise() -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
def revise(self) -> _JobTemplateT:
    self_as_job_base = cast(
        IsJobBase[IsJobTemplate[_JobTemplateT, _JobT, _CallP, _RetT], _JobT, _CallP, _RetT],
        self)
    job_template = self_as_job_base._revise()
    update_wrapper(job_template, self, updated=[])
    return cast(_JobTemplateT, job_template)

DagFlowTemplateCore

Bases: TaskTemplateArgsJobBase[IsDagFlowTemplate[_CallP, _RetT], IsDagFlow[_CallP, _RetT], _CallP, _RetT], JobTemplateMixin[IsDagFlowTemplate[_CallP, _RetT], IsDagFlow[_CallP, _RetT], _CallP, _RetT], FlowBase, Generic[_CallP, _RetT]

METHOD DESCRIPTION
__init__
accept_mixin
apply
create_job_template
has_coroutine_func
log
refine
reset_mixins
run
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobConfig

engine

TYPE: IsEngine | None

in_flow_context

TYPE: bool

logger

TYPE: Logger

task_templates

TYPE: tuple[IsTaskTemplate, ...]

Source code in src/omnipy/compute/flow.py
class DagFlowTemplateCore(TaskTemplateArgsJobBase[IsDagFlowTemplate[_CallP, _RetT],
                                                  IsDagFlow[_CallP, _RetT],
                                                  _CallP,
                                                  _RetT],
                          JobTemplateMixin[IsDagFlowTemplate[_CallP, _RetT],
                                           IsDagFlow[_CallP, _RetT],
                                           _CallP,
                                           _RetT],
                          FlowBase,
                          Generic[_CallP, _RetT]):
    @classmethod
    def _get_job_subcls_for_apply(cls) -> type[IsDagFlow[_CallP, _RetT]]:
        return cast(type[IsDagFlow[_CallP, _RetT]], DagFlow[_CallP, _RetT])

config property

config: IsJobConfig

engine property

engine: IsEngine | None

in_flow_context property

in_flow_context: bool

logger property

logger: Logger

task_templates property

task_templates: tuple[IsTaskTemplate, ...]

__init__

__init__(
    job_func: Callable[_CallP, _RetT], /, *task_templates: IsTaskTemplate, **kwargs: object
) -> None
Source code in src/omnipy/compute/_tasklist_job.py
def __init__(self,
             job_func: Callable[_CallP, _RetT],
             /,
             *task_templates: IsTaskTemplate,
             **kwargs: object) -> None:
    self._task_templates: tuple[IsTaskTemplate, ...] = task_templates

accept_mixin classmethod

accept_mixin(mixin_cls: Type) -> None
Source code in src/omnipy/util/mixin.py
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
    cls._accept_mixin(mixin_cls, update=True)

apply

apply() -> _JobT
Source code in src/omnipy/compute/_job.py
def apply(self) -> _JobT:
    job = self._cast_to_job_tmpl()._apply()
    update_wrapper(job, self, updated=[])
    return cast(_JobT, job)

create_job_template classmethod

create_job_template(*args: object, **kwargs: object) -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
@classmethod
def create_job_template(cls, *args: object, **kwargs: object) -> _JobTemplateT:
    cls_as_job_base = cast(type[IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT]], cls)
    return cls_as_job_base._create_job_template(*args, **kwargs)

has_coroutine_func

has_coroutine_func() -> bool
Source code in src/omnipy/compute/_func_job.py
def has_coroutine_func(self) -> bool:
    return asyncio.iscoroutinefunction(self._job_func)

log

log(log_msg: str, level: int = INFO, datetime_obj: datetime | None = None)
Source code in src/omnipy/hub/log/mixin.py
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
    if self._logger is not None:
        create_time = datetime_obj.timestamp() if datetime_obj else time.time()
        self._logger.log(level, log_msg, extra=dict(timestamp=create_time))

refine

refine(*args: Any, update: bool = True, **kwargs: object) -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
def refine(self, *args: Any, update: bool = True, **kwargs: object) -> _JobTemplateT:
    self_as_job_base = cast(IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT], self)
    return self_as_job_base._refine(*args, update=update, **kwargs)

reset_mixins classmethod

reset_mixins()
Source code in src/omnipy/util/mixin.py
@classmethod
def reset_mixins(cls):
    cls._mixin_classes.clear()
    cls._init_params_per_mixin_cls.clear()
    cls.__init__.__signature__ = cls._orig_init_signature

run

run(*args: _CallP.args, **kwargs: _CallP.kwargs) -> _RetT
Source code in src/omnipy/compute/_job.py
def run(self, *args: _CallP.args, **kwargs: _CallP.kwargs) -> _RetT:
    # TODO: Using JobTemplateMixin.run() inside flows should give error message
    return self._cast_to_job_tmpl().apply()(*args, **kwargs)

FlowBase

Source code in src/omnipy/compute/flow.py
class FlowBase:
    ...

FuncFlow

Bases: JobMixin[IsFuncFlowTemplate[_CallP, _RetT], IsFuncFlow[_CallP, _RetT], _CallP, _RetT], FlowBase, FuncArgJobBase[IsFuncFlowTemplate[_CallP, _RetT], IsFuncFlow[_CallP, _RetT], _CallP, _RetT], Generic[_CallP, _RetT]

METHOD DESCRIPTION
__init__
accept_mixin
create_job
has_coroutine_func
log
reset_mixins
revise
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobConfig

engine

TYPE: IsEngine | None

in_flow_context

TYPE: bool

logger

TYPE: Logger

time_of_cur_toplevel_flow_run

TYPE: datetime | None

Source code in src/omnipy/compute/flow.py
class FuncFlow(JobMixin[IsFuncFlowTemplate[_CallP, _RetT], IsFuncFlow[_CallP, _RetT], _CallP,
                        _RetT],
               FlowBase,
               FuncArgJobBase[IsFuncFlowTemplate[_CallP, _RetT],
                              IsFuncFlow[_CallP, _RetT],
                              _CallP,
                              _RetT],
               Generic[_CallP, _RetT]):
    def _apply_engine_decorator(self, engine: IsEngine) -> None:
        if self.engine:
            engine = cast(IsFuncFlowRunnerEngine, self.engine)
            self_with_mixins = cast(IsFuncFlow, self)
            engine.apply_func_flow_decorator(self_with_mixins, self._accept_call_func_decorator)

    @classmethod
    def _get_job_template_subcls_for_revise(cls) -> type[IsFuncFlowTemplate[_CallP, _RetT]]:
        return cast(type[IsFuncFlowTemplate[_CallP, _RetT]], FuncFlowTemplate)

config property

config: IsJobConfig

engine property

engine: IsEngine | None

in_flow_context property

in_flow_context: bool

logger property

logger: Logger

time_of_cur_toplevel_flow_run property

time_of_cur_toplevel_flow_run: datetime | None

__init__

__init__(*args, **kwargs)
Source code in src/omnipy/compute/_job.py
def __init__(self, *args, **kwargs):
    if JobBase not in self.__class__.__mro__:
        raise TypeError('JobMixin is not meant to be instantiated outside the context '
                        'of a JobBase subclass.')

accept_mixin classmethod

accept_mixin(mixin_cls: Type) -> None
Source code in src/omnipy/util/mixin.py
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
    cls._accept_mixin(mixin_cls, update=True)

create_job classmethod

create_job(*args: object, **kwargs: object) -> _JobT
Source code in src/omnipy/compute/_job.py
@classmethod
def create_job(cls, *args: object, **kwargs: object) -> _JobT:
    cls_as_job_base = cast(IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT], cls)
    return cls_as_job_base._create_job(*args, **kwargs)

has_coroutine_func

has_coroutine_func() -> bool
Source code in src/omnipy/compute/_func_job.py
def has_coroutine_func(self) -> bool:
    return asyncio.iscoroutinefunction(self._job_func)

log

log(log_msg: str, level: int = INFO, datetime_obj: datetime | None = None)
Source code in src/omnipy/hub/log/mixin.py
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
    if self._logger is not None:
        create_time = datetime_obj.timestamp() if datetime_obj else time.time()
        self._logger.log(level, log_msg, extra=dict(timestamp=create_time))

reset_mixins classmethod

reset_mixins()
Source code in src/omnipy/util/mixin.py
@classmethod
def reset_mixins(cls):
    cls._mixin_classes.clear()
    cls._init_params_per_mixin_cls.clear()
    cls.__init__.__signature__ = cls._orig_init_signature

revise

revise() -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
def revise(self) -> _JobTemplateT:
    self_as_job_base = cast(
        IsJobBase[IsJobTemplate[_JobTemplateT, _JobT, _CallP, _RetT], _JobT, _CallP, _RetT],
        self)
    job_template = self_as_job_base._revise()
    update_wrapper(job_template, self, updated=[])
    return cast(_JobTemplateT, job_template)

FuncFlowTemplateCore

Bases: FuncArgJobBase[IsFuncFlowTemplate[_CallP, _RetT], IsFuncFlow[_CallP, _RetT], _CallP, _RetT], JobTemplateMixin[IsFuncFlowTemplate[_CallP, _RetT], IsFuncFlow[_CallP, _RetT], _CallP, _RetT], FlowBase, Generic[_CallP, _RetT]

METHOD DESCRIPTION
__init__
accept_mixin
apply
create_job_template
has_coroutine_func
log
refine
reset_mixins
run
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobConfig

engine

TYPE: IsEngine | None

in_flow_context

TYPE: bool

logger

TYPE: Logger

Source code in src/omnipy/compute/flow.py
class FuncFlowTemplateCore(FuncArgJobBase[IsFuncFlowTemplate[_CallP, _RetT],
                                          IsFuncFlow[_CallP, _RetT],
                                          _CallP,
                                          _RetT],
                           JobTemplateMixin[IsFuncFlowTemplate[_CallP, _RetT],
                                            IsFuncFlow[_CallP, _RetT],
                                            _CallP,
                                            _RetT],
                           FlowBase,
                           Generic[_CallP, _RetT]):
    @classmethod
    def _get_job_subcls_for_apply(cls) -> type[IsFuncFlow[_CallP, _RetT]]:
        return cast(type[IsFuncFlow[_CallP, _RetT]], FuncFlow[_CallP, _RetT])

config property

config: IsJobConfig

engine property

engine: IsEngine | None

in_flow_context property

in_flow_context: bool

logger property

logger: Logger

__init__

__init__(job_func: Callable[_CallP, _RetT], /, *args: object, **kwargs: object) -> None
Source code in src/omnipy/compute/_func_job.py
def __init__(self, job_func: Callable[_CallP, _RetT], /, *args: object,
             **kwargs: object) -> None:
    self._job_func = job_func

accept_mixin classmethod

accept_mixin(mixin_cls: Type) -> None
Source code in src/omnipy/util/mixin.py
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
    cls._accept_mixin(mixin_cls, update=True)

apply

apply() -> _JobT
Source code in src/omnipy/compute/_job.py
def apply(self) -> _JobT:
    job = self._cast_to_job_tmpl()._apply()
    update_wrapper(job, self, updated=[])
    return cast(_JobT, job)

create_job_template classmethod

create_job_template(*args: object, **kwargs: object) -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
@classmethod
def create_job_template(cls, *args: object, **kwargs: object) -> _JobTemplateT:
    cls_as_job_base = cast(type[IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT]], cls)
    return cls_as_job_base._create_job_template(*args, **kwargs)

has_coroutine_func

has_coroutine_func() -> bool
Source code in src/omnipy/compute/_func_job.py
def has_coroutine_func(self) -> bool:
    return asyncio.iscoroutinefunction(self._job_func)

log

log(log_msg: str, level: int = INFO, datetime_obj: datetime | None = None)
Source code in src/omnipy/hub/log/mixin.py
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
    if self._logger is not None:
        create_time = datetime_obj.timestamp() if datetime_obj else time.time()
        self._logger.log(level, log_msg, extra=dict(timestamp=create_time))

refine

refine(*args: Any, update: bool = True, **kwargs: object) -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
def refine(self, *args: Any, update: bool = True, **kwargs: object) -> _JobTemplateT:
    self_as_job_base = cast(IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT], self)
    return self_as_job_base._refine(*args, update=update, **kwargs)

reset_mixins classmethod

reset_mixins()
Source code in src/omnipy/util/mixin.py
@classmethod
def reset_mixins(cls):
    cls._mixin_classes.clear()
    cls._init_params_per_mixin_cls.clear()
    cls.__init__.__signature__ = cls._orig_init_signature

run

run(*args: _CallP.args, **kwargs: _CallP.kwargs) -> _RetT
Source code in src/omnipy/compute/_job.py
def run(self, *args: _CallP.args, **kwargs: _CallP.kwargs) -> _RetT:
    # TODO: Using JobTemplateMixin.run() inside flows should give error message
    return self._cast_to_job_tmpl().apply()(*args, **kwargs)

LinearFlow

Bases: JobMixin[IsLinearFlowTemplate[_CallP, _RetT], IsLinearFlow[_CallP, _RetT], _CallP, _RetT], FlowBase, TaskTemplateArgsJobBase[IsLinearFlowTemplate[_CallP, _RetT], IsLinearFlow[_CallP, _RetT], _CallP, _RetT], Generic[_CallP, _RetT]

METHOD DESCRIPTION
__init__
accept_mixin
create_job
has_coroutine_func
log
reset_mixins
revise
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobConfig

engine

TYPE: IsEngine | None

in_flow_context

TYPE: bool

logger

TYPE: Logger

task_templates

TYPE: tuple[IsTaskTemplate, ...]

time_of_cur_toplevel_flow_run

TYPE: datetime | None

Source code in src/omnipy/compute/flow.py
class LinearFlow(JobMixin[IsLinearFlowTemplate[_CallP, _RetT],
                          IsLinearFlow[_CallP, _RetT],
                          _CallP,
                          _RetT],
                 FlowBase,
                 TaskTemplateArgsJobBase[IsLinearFlowTemplate[_CallP, _RetT],
                                         IsLinearFlow[_CallP, _RetT],
                                         _CallP,
                                         _RetT],
                 Generic[_CallP, _RetT]):
    def _apply_engine_decorator(self, engine: IsEngine) -> None:
        if self.engine:
            engine = cast(IsLinearFlowRunnerEngine, self.engine)
            self_with_mixins = cast(IsLinearFlow, self)
            engine.apply_linear_flow_decorator(self_with_mixins, self._accept_call_func_decorator)

    @classmethod
    def _get_job_template_subcls_for_revise(cls) -> type[IsLinearFlowTemplate[_CallP, _RetT]]:
        return cast(type[IsLinearFlowTemplate[_CallP, _RetT]], LinearFlowTemplate)

config property

config: IsJobConfig

engine property

engine: IsEngine | None

in_flow_context property

in_flow_context: bool

logger property

logger: Logger

task_templates property

task_templates: tuple[IsTaskTemplate, ...]

time_of_cur_toplevel_flow_run property

time_of_cur_toplevel_flow_run: datetime | None

__init__

__init__(*args, **kwargs)
Source code in src/omnipy/compute/_job.py
def __init__(self, *args, **kwargs):
    if JobBase not in self.__class__.__mro__:
        raise TypeError('JobMixin is not meant to be instantiated outside the context '
                        'of a JobBase subclass.')

accept_mixin classmethod

accept_mixin(mixin_cls: Type) -> None
Source code in src/omnipy/util/mixin.py
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
    cls._accept_mixin(mixin_cls, update=True)

create_job classmethod

create_job(*args: object, **kwargs: object) -> _JobT
Source code in src/omnipy/compute/_job.py
@classmethod
def create_job(cls, *args: object, **kwargs: object) -> _JobT:
    cls_as_job_base = cast(IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT], cls)
    return cls_as_job_base._create_job(*args, **kwargs)

has_coroutine_func

has_coroutine_func() -> bool
Source code in src/omnipy/compute/_func_job.py
def has_coroutine_func(self) -> bool:
    return asyncio.iscoroutinefunction(self._job_func)

log

log(log_msg: str, level: int = INFO, datetime_obj: datetime | None = None)
Source code in src/omnipy/hub/log/mixin.py
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
    if self._logger is not None:
        create_time = datetime_obj.timestamp() if datetime_obj else time.time()
        self._logger.log(level, log_msg, extra=dict(timestamp=create_time))

reset_mixins classmethod

reset_mixins()
Source code in src/omnipy/util/mixin.py
@classmethod
def reset_mixins(cls):
    cls._mixin_classes.clear()
    cls._init_params_per_mixin_cls.clear()
    cls.__init__.__signature__ = cls._orig_init_signature

revise

revise() -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
def revise(self) -> _JobTemplateT:
    self_as_job_base = cast(
        IsJobBase[IsJobTemplate[_JobTemplateT, _JobT, _CallP, _RetT], _JobT, _CallP, _RetT],
        self)
    job_template = self_as_job_base._revise()
    update_wrapper(job_template, self, updated=[])
    return cast(_JobTemplateT, job_template)

LinearFlowTemplateCore

Bases: TaskTemplateArgsJobBase[IsLinearFlowTemplate[_CallP, _RetT], IsLinearFlow[_CallP, _RetT], _CallP, _RetT], JobTemplateMixin[IsLinearFlowTemplate[_CallP, _RetT], IsLinearFlow[_CallP, _RetT], _CallP, _RetT], FlowBase, Generic[_CallP, _RetT]

METHOD DESCRIPTION
__init__
accept_mixin
apply
create_job_template
has_coroutine_func
log
refine
reset_mixins
run
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobConfig

engine

TYPE: IsEngine | None

in_flow_context

TYPE: bool

logger

TYPE: Logger

task_templates

TYPE: tuple[IsTaskTemplate, ...]

Source code in src/omnipy/compute/flow.py
class LinearFlowTemplateCore(TaskTemplateArgsJobBase[IsLinearFlowTemplate[_CallP, _RetT],
                                                     IsLinearFlow[_CallP, _RetT],
                                                     _CallP,
                                                     _RetT],
                             JobTemplateMixin[IsLinearFlowTemplate[_CallP, _RetT],
                                              IsLinearFlow[_CallP, _RetT],
                                              _CallP,
                                              _RetT],
                             FlowBase,
                             Generic[_CallP, _RetT]):
    @classmethod
    def _get_job_subcls_for_apply(cls) -> type[IsLinearFlow[_CallP, _RetT]]:
        return cast(type[IsLinearFlow[_CallP, _RetT]], LinearFlow[_CallP, _RetT])

config property

config: IsJobConfig

engine property

engine: IsEngine | None

in_flow_context property

in_flow_context: bool

logger property

logger: Logger

task_templates property

task_templates: tuple[IsTaskTemplate, ...]

__init__

__init__(
    job_func: Callable[_CallP, _RetT], /, *task_templates: IsTaskTemplate, **kwargs: object
) -> None
Source code in src/omnipy/compute/_tasklist_job.py
def __init__(self,
             job_func: Callable[_CallP, _RetT],
             /,
             *task_templates: IsTaskTemplate,
             **kwargs: object) -> None:
    self._task_templates: tuple[IsTaskTemplate, ...] = task_templates

accept_mixin classmethod

accept_mixin(mixin_cls: Type) -> None
Source code in src/omnipy/util/mixin.py
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
    cls._accept_mixin(mixin_cls, update=True)

apply

apply() -> _JobT
Source code in src/omnipy/compute/_job.py
def apply(self) -> _JobT:
    job = self._cast_to_job_tmpl()._apply()
    update_wrapper(job, self, updated=[])
    return cast(_JobT, job)

create_job_template classmethod

create_job_template(*args: object, **kwargs: object) -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
@classmethod
def create_job_template(cls, *args: object, **kwargs: object) -> _JobTemplateT:
    cls_as_job_base = cast(type[IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT]], cls)
    return cls_as_job_base._create_job_template(*args, **kwargs)

has_coroutine_func

has_coroutine_func() -> bool
Source code in src/omnipy/compute/_func_job.py
def has_coroutine_func(self) -> bool:
    return asyncio.iscoroutinefunction(self._job_func)

log

log(log_msg: str, level: int = INFO, datetime_obj: datetime | None = None)
Source code in src/omnipy/hub/log/mixin.py
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
    if self._logger is not None:
        create_time = datetime_obj.timestamp() if datetime_obj else time.time()
        self._logger.log(level, log_msg, extra=dict(timestamp=create_time))

refine

refine(*args: Any, update: bool = True, **kwargs: object) -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
def refine(self, *args: Any, update: bool = True, **kwargs: object) -> _JobTemplateT:
    self_as_job_base = cast(IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT], self)
    return self_as_job_base._refine(*args, update=update, **kwargs)

reset_mixins classmethod

reset_mixins()
Source code in src/omnipy/util/mixin.py
@classmethod
def reset_mixins(cls):
    cls._mixin_classes.clear()
    cls._init_params_per_mixin_cls.clear()
    cls.__init__.__signature__ = cls._orig_init_signature

run

run(*args: _CallP.args, **kwargs: _CallP.kwargs) -> _RetT
Source code in src/omnipy/compute/_job.py
def run(self, *args: _CallP.args, **kwargs: _CallP.kwargs) -> _RetT:
    # TODO: Using JobTemplateMixin.run() inside flows should give error message
    return self._cast_to_job_tmpl().apply()(*args, **kwargs)

dag_flow_template_as_callable_decorator

dag_flow_template_as_callable_decorator(
    decorated_cls: Callable[Concatenate[_CallableT, _InitP], IsDagFlowTemplate],
) -> Callable[_InitP, Callable[[Callable[_CallP, _RetT]], IsDagFlowTemplate[_CallP, _RetT]]]
Source code in src/omnipy/compute/flow.py
def dag_flow_template_as_callable_decorator(
    decorated_cls: Callable[Concatenate[_CallableT, _InitP], IsDagFlowTemplate]) -> \
        Callable[_InitP, Callable[[Callable[_CallP, _RetT]], IsDagFlowTemplate[_CallP, _RetT]]]:
    return callable_decorator_cls(
        cast(
            Callable[Concatenate[Callable[_CallP, _RetT], _InitP], IsDagFlowTemplate[_CallP,
                                                                                     _RetT]],
            decorated_cls))

func_flow_template_as_callable_decorator

func_flow_template_as_callable_decorator(
    decorated_cls: Callable[Concatenate[_CallableT, _InitP], IsFuncFlowTemplate],
) -> Callable[_InitP, Callable[[Callable[_CallP, _RetT]], IsFuncFlowTemplate[_CallP, _RetT]]]
Source code in src/omnipy/compute/flow.py
def func_flow_template_as_callable_decorator(
    decorated_cls: Callable[Concatenate[_CallableT, _InitP], IsFuncFlowTemplate]) -> \
        Callable[_InitP, Callable[[Callable[_CallP, _RetT]], IsFuncFlowTemplate[_CallP, _RetT]]]:
    return callable_decorator_cls(
        cast(
            Callable[Concatenate[Callable[_CallP, _RetT], _InitP],
                     IsFuncFlowTemplate[_CallP, _RetT]],
            decorated_cls))

linear_flow_template_as_callable_decorator

linear_flow_template_as_callable_decorator(
    decorated_cls: Callable[Concatenate[_CallableT, _InitP], IsLinearFlowTemplate],
) -> Callable[_InitP, Callable[[Callable[_CallP, _RetT]], IsLinearFlowTemplate[_CallP, _RetT]]]
Source code in src/omnipy/compute/flow.py
def linear_flow_template_as_callable_decorator(
    decorated_cls: Callable[Concatenate[_CallableT, _InitP], IsLinearFlowTemplate]) -> \
        Callable[_InitP, Callable[[Callable[_CallP, _RetT]], IsLinearFlowTemplate[_CallP, _RetT]]]:
    return callable_decorator_cls(
        cast(
            Callable[Concatenate[Callable[_CallP, _RetT], _InitP],
                     IsLinearFlowTemplate[_CallP, _RetT]],
            decorated_cls))

to_dag_flow_template_init_protocol

to_dag_flow_template_init_protocol(
    decorated_cls: Callable[
        Concatenate[Callable[_CallP, _RetT], _InitP], DagFlowTemplateCore[_CallP, _RetT]
    ],
) -> HasTaskTemplateArgsJobTemplateInit[
    IsDagFlowTemplate[_CallP, _RetT], IsTaskTemplate, _CallP, _RetT
]
Source code in src/omnipy/compute/flow.py
def to_dag_flow_template_init_protocol(
    decorated_cls: Callable[Concatenate[Callable[_CallP, _RetT], _InitP],
                            DagFlowTemplateCore[_CallP, _RetT]]
) -> HasTaskTemplateArgsJobTemplateInit[
        IsDagFlowTemplate[_CallP, _RetT], IsTaskTemplate, _CallP, _RetT]:
    return cast(
        HasTaskTemplateArgsJobTemplateInit[IsDagFlowTemplate[_CallP, _RetT],
                                           IsTaskTemplate,
                                           _CallP,
                                           _RetT],
        decorated_cls)

to_func_flow_template_init_protocol

to_func_flow_template_init_protocol(
    decorated_cls: Callable[
        Concatenate[Callable[_CallP, _RetT], _InitP], FuncFlowTemplateCore[_CallP, _RetT]
    ],
) -> HasFuncArgJobTemplateInit[IsFuncFlowTemplate[_CallP, _RetT], _CallP, _RetT]
Source code in src/omnipy/compute/flow.py
def to_func_flow_template_init_protocol(
    decorated_cls: Callable[Concatenate[Callable[_CallP, _RetT], _InitP],
                            FuncFlowTemplateCore[_CallP, _RetT]]
) -> HasFuncArgJobTemplateInit[IsFuncFlowTemplate[_CallP, _RetT], _CallP, _RetT]:
    return cast(HasFuncArgJobTemplateInit[IsFuncFlowTemplate[_CallP, _RetT], _CallP, _RetT],
                decorated_cls)

to_linear_flow_template_init_protocol

to_linear_flow_template_init_protocol(
    decorated_cls: Callable[
        Concatenate[Callable[_CallP, _RetT], _InitP], LinearFlowTemplateCore[_CallP, _RetT]
    ],
) -> HasTaskTemplateArgsJobTemplateInit[
    IsLinearFlowTemplate[_CallP, _RetT], IsTaskTemplate, _CallP, _RetT
]
Source code in src/omnipy/compute/flow.py
def to_linear_flow_template_init_protocol(
    decorated_cls: Callable[Concatenate[Callable[_CallP, _RetT], _InitP],
                            LinearFlowTemplateCore[_CallP, _RetT]]
) -> HasTaskTemplateArgsJobTemplateInit[
        IsLinearFlowTemplate[_CallP, _RetT], IsTaskTemplate, _CallP, _RetT]:
    return cast(
        HasTaskTemplateArgsJobTemplateInit[IsLinearFlowTemplate[_CallP, _RetT],
                                           IsTaskTemplate,
                                           _CallP,
                                           _RetT],
        decorated_cls)