Skip to content

omnipy.compute.task

CLASS DESCRIPTION
Task
TaskBase
TaskTemplateCore
ATTRIBUTE DESCRIPTION
TaskTemplate

Task

Bases: JobMixin[IsTaskTemplate[_CallP, _RetT], IsTask[_CallP, _RetT], _CallP, _RetT], TaskBase, FuncArgJobBase[IsTaskTemplate[_CallP, _RetT], IsTask[_CallP, _RetT], _CallP, _RetT], Generic[_CallP, _RetT]

METHOD DESCRIPTION
__init__
accept_mixin
create_job
has_coroutine_func
log
reset_mixins
revise
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobConfig

engine

TYPE: IsEngine | None

in_flow_context

TYPE: bool

logger

TYPE: Logger

time_of_cur_toplevel_flow_run

TYPE: datetime | None

Source code in src/omnipy/compute/task.py
class Task(JobMixin[IsTaskTemplate[_CallP, _RetT], IsTask[_CallP, _RetT], _CallP, _RetT],
           TaskBase,
           FuncArgJobBase[IsTaskTemplate[_CallP, _RetT], IsTask[_CallP, _RetT], _CallP, _RetT],
           Generic[_CallP, _RetT]):
    def _apply_engine_decorator(self, engine: IsEngine) -> None:
        if self.engine:
            engine = cast(IsTaskRunnerEngine, self.engine)
            self_with_mixins = cast(IsTask[_CallP, _RetT], self)
            engine.apply_task_decorator(self_with_mixins, self._accept_call_func_decorator)

    @classmethod
    def _get_job_template_subcls_for_revise(cls) -> type[IsTaskTemplate[_CallP, _RetT]]:
        return cast(type[IsTaskTemplate[_CallP, _RetT]], TaskTemplate)

config property

config: IsJobConfig

engine property

engine: IsEngine | None

in_flow_context property

in_flow_context: bool

logger property

logger: Logger

time_of_cur_toplevel_flow_run property

time_of_cur_toplevel_flow_run: datetime | None

__init__

__init__(*args, **kwargs)
Source code in src/omnipy/compute/_job.py
def __init__(self, *args, **kwargs):
    if JobBase not in self.__class__.__mro__:
        raise TypeError('JobMixin is not meant to be instantiated outside the context '
                        'of a JobBase subclass.')

accept_mixin classmethod

accept_mixin(mixin_cls: Type) -> None
Source code in src/omnipy/util/mixin.py
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
    cls._accept_mixin(mixin_cls, update=True)

create_job classmethod

create_job(*args: object, **kwargs: object) -> _JobT
Source code in src/omnipy/compute/_job.py
@classmethod
def create_job(cls, *args: object, **kwargs: object) -> _JobT:
    cls_as_job_base = cast(IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT], cls)
    return cls_as_job_base._create_job(*args, **kwargs)

has_coroutine_func

has_coroutine_func() -> bool
Source code in src/omnipy/compute/_func_job.py
def has_coroutine_func(self) -> bool:
    return asyncio.iscoroutinefunction(self._job_func)

log

log(log_msg: str, level: int = INFO, datetime_obj: datetime | None = None)
Source code in src/omnipy/hub/log/mixin.py
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
    if self._logger is not None:
        create_time = datetime_obj.timestamp() if datetime_obj else time.time()
        self._logger.log(level, log_msg, extra=dict(timestamp=create_time))

reset_mixins classmethod

reset_mixins()
Source code in src/omnipy/util/mixin.py
@classmethod
def reset_mixins(cls):
    cls._mixin_classes.clear()
    cls._init_params_per_mixin_cls.clear()
    cls.__init__.__signature__ = cls._orig_init_signature

revise

revise() -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
def revise(self) -> _JobTemplateT:
    self_as_job_base = cast(
        IsJobBase[IsJobTemplate[_JobTemplateT, _JobT, _CallP, _RetT], _JobT, _CallP, _RetT],
        self)
    job_template = self_as_job_base._revise()
    update_wrapper(job_template, self, updated=[])
    return cast(_JobTemplateT, job_template)

TaskBase

Source code in src/omnipy/compute/task.py
class TaskBase:
    # TODO: Can this and FlowBase be replaced with IsTask/IsFlow, or similar?
    ...

TaskTemplateCore

Bases: FuncArgJobBase[IsTaskTemplate[_CallP, _RetT], IsTask[_CallP, _RetT], _CallP, _RetT], JobTemplateMixin[IsTaskTemplate[_CallP, _RetT], IsTask[_CallP, _RetT], _CallP, _RetT], TaskBase, Generic[_CallP, _RetT]

METHOD DESCRIPTION
__init__
accept_mixin
apply
create_job_template
has_coroutine_func
log
refine
reset_mixins
run
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobConfig

engine

TYPE: IsEngine | None

in_flow_context

TYPE: bool

logger

TYPE: Logger

Source code in src/omnipy/compute/task.py
class TaskTemplateCore(FuncArgJobBase[IsTaskTemplate[_CallP, _RetT],
                                      IsTask[_CallP, _RetT],
                                      _CallP,
                                      _RetT],
                       JobTemplateMixin[IsTaskTemplate[_CallP, _RetT],
                                        IsTask[_CallP, _RetT],
                                        _CallP,
                                        _RetT],
                       TaskBase,
                       Generic[_CallP, _RetT]):
    """"""
    @classmethod
    def _get_job_subcls_for_apply(cls) -> type[IsTask[_CallP, _RetT]]:
        return cast(type[IsTask[_CallP, _RetT]], Task[_CallP, _RetT])

config property

config: IsJobConfig

engine property

engine: IsEngine | None

in_flow_context property

in_flow_context: bool

logger property

logger: Logger

__init__

__init__(job_func: Callable[_CallP, _RetT], /, *args: object, **kwargs: object) -> None
Source code in src/omnipy/compute/_func_job.py
def __init__(self, job_func: Callable[_CallP, _RetT], /, *args: object,
             **kwargs: object) -> None:
    self._job_func = job_func

accept_mixin classmethod

accept_mixin(mixin_cls: Type) -> None
Source code in src/omnipy/util/mixin.py
@classmethod
def accept_mixin(cls, mixin_cls: Type) -> None:
    cls._accept_mixin(mixin_cls, update=True)

apply

apply() -> _JobT
Source code in src/omnipy/compute/_job.py
def apply(self) -> _JobT:
    job = self._cast_to_job_tmpl()._apply()
    update_wrapper(job, self, updated=[])
    return cast(_JobT, job)

create_job_template classmethod

create_job_template(*args: object, **kwargs: object) -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
@classmethod
def create_job_template(cls, *args: object, **kwargs: object) -> _JobTemplateT:
    cls_as_job_base = cast(type[IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT]], cls)
    return cls_as_job_base._create_job_template(*args, **kwargs)

has_coroutine_func

has_coroutine_func() -> bool
Source code in src/omnipy/compute/_func_job.py
def has_coroutine_func(self) -> bool:
    return asyncio.iscoroutinefunction(self._job_func)

log

log(log_msg: str, level: int = INFO, datetime_obj: datetime | None = None)
Source code in src/omnipy/hub/log/mixin.py
def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):
    if self._logger is not None:
        create_time = datetime_obj.timestamp() if datetime_obj else time.time()
        self._logger.log(level, log_msg, extra=dict(timestamp=create_time))

refine

refine(*args: Any, update: bool = True, **kwargs: object) -> _JobTemplateT
Source code in src/omnipy/compute/_job.py
def refine(self, *args: Any, update: bool = True, **kwargs: object) -> _JobTemplateT:
    self_as_job_base = cast(IsJobBase[_JobTemplateT, _JobT, _CallP, _RetT], self)
    return self_as_job_base._refine(*args, update=update, **kwargs)

reset_mixins classmethod

reset_mixins()
Source code in src/omnipy/util/mixin.py
@classmethod
def reset_mixins(cls):
    cls._mixin_classes.clear()
    cls._init_params_per_mixin_cls.clear()
    cls.__init__.__signature__ = cls._orig_init_signature

run

run(*args: _CallP.args, **kwargs: _CallP.kwargs) -> _RetT
Source code in src/omnipy/compute/_job.py
def run(self, *args: _CallP.args, **kwargs: _CallP.kwargs) -> _RetT:
    # TODO: Using JobTemplateMixin.run() inside flows should give error message
    return self._cast_to_job_tmpl().apply()(*args, **kwargs)

task_template_as_callable_decorator

task_template_as_callable_decorator(
    decorated_cls: Callable[Concatenate[_CallableT, _InitP], IsTaskTemplate],
) -> Callable[_InitP, Callable[[Callable[_CallP, _RetT]], IsTaskTemplate[_CallP, _RetT]]]
Source code in src/omnipy/compute/task.py
def task_template_as_callable_decorator(
    decorated_cls: Callable[Concatenate[_CallableT, _InitP], IsTaskTemplate]) -> \
        Callable[_InitP, Callable[[Callable[_CallP, _RetT]], IsTaskTemplate[_CallP, _RetT]]]:
    return callable_decorator_cls(
        cast(Callable[Concatenate[Callable[_CallP, _RetT], _InitP], IsTaskTemplate[_CallP, _RetT]],
             decorated_cls))

to_task_template_init_protocol

to_task_template_init_protocol(
    decorated_cls: Callable[
        Concatenate[Callable[_CallP, _RetT], _InitP], TaskTemplateCore[_CallP, _RetT]
    ],
) -> HasFuncArgJobTemplateInit[IsTaskTemplate[_CallP, _RetT], _CallP, _RetT]
Source code in src/omnipy/compute/task.py
def to_task_template_init_protocol(
    decorated_cls: Callable[Concatenate[Callable[_CallP, _RetT], _InitP],
                            TaskTemplateCore[_CallP, _RetT]]
) -> HasFuncArgJobTemplateInit[IsTaskTemplate[_CallP, _RetT], _CallP, _RetT]:
    return cast(HasFuncArgJobTemplateInit[IsTaskTemplate[_CallP, _RetT], _CallP, _RetT],
                decorated_cls)