Skip to content

Module omnipy.api.protocols.private.compute.job

Overview

View Source
from datetime import datetime

from types import MappingProxyType

from typing import Any, Callable, Mapping, Protocol, Type

from omnipy.api.enums import (OutputStorageProtocolOptions,

                              PersistOutputsOptions,

                              RestoreOutputsOptions)

from omnipy.api.protocols.private.compute.job_creator import IsJobCreator

from omnipy.api.protocols.private.compute.mixins import IsUniquelyNamedJob

from omnipy.api.protocols.private.engine import IsEngine

from omnipy.api.protocols.private.log import CanLog

from omnipy.api.protocols.public.config import IsJobConfig

from omnipy.api.typedefs import (GeneralDecorator,

                                 JobT,

                                 JobTemplateT,

                                 TaskTemplateContraT,

                                 TaskTemplateCovT,

                                 TaskTemplateT)

class IsJobBase(CanLog, IsUniquelyNamedJob, Protocol):

    """"""

    @property

    def _job_creator(self) -> IsJobCreator:

        ...

    @property

    def config(self) -> IsJobConfig | None:

        ...

    @property

    def engine(self) -> IsEngine | None:

        ...

    @property

    def in_flow_context(self) -> bool:

        ...

    def __eq__(self, other: object):

        ...

    @classmethod

    def _create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

    @classmethod

    def _create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

    def _apply(self) -> 'IsJob':

        ...

    def _refine(self, *args: Any, update: bool = True, **kwargs: object) -> 'IsJobTemplate':

        ...

    def _revise(self) -> 'IsJobTemplate':

        ...

    def _call_job_template(self, *args: object, **kwargs: object) -> object:

        ...

    def _call_job(self, *args: object, **kwargs: object) -> object:

        ...

class IsJob(IsJobBase, Protocol):

    """"""

    @property

    def time_of_cur_toplevel_flow_run(self) -> datetime | None:

        ...

    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

    def __call__(self, *args: object, **kwargs: object) -> object:

        ...

    def _apply_engine_decorator(self, engine: IsEngine) -> None:

        ...

class IsJobTemplate(IsJobBase, Protocol):

    """"""

    @classmethod

    def create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

    def run(self, *args: object, **kwargs: object) -> object:

        ...

class IsFuncArgJobBase(IsJob, Protocol):

    """"""

    @property

    def param_signatures(self) -> MappingProxyType:

        ...

    @property

    def return_type(self) -> Type[object]:

        ...

    @property

    def iterate_over_data_files(self) -> bool:

        ...

    @property

    def persist_outputs(self) -> PersistOutputsOptions | None:

        ...

    @property

    def restore_outputs(self) -> RestoreOutputsOptions | None:

        ...

    @property

    def output_storage_protocol(self) -> OutputStorageProtocolOptions | None:

        ...

    @property

    def will_persist_outputs(self) -> PersistOutputsOptions:

        ...

    @property

    def will_restore_outputs(self) -> RestoreOutputsOptions:

        ...

    @property

    def output_storage_protocol_to_use(self) -> OutputStorageProtocolOptions:

        ...

    @property

    def result_key(self) -> str | None:

        ...

    @property

    def fixed_params(self) -> MappingProxyType[str, object]:

        ...

    @property

    def param_key_map(self) -> MappingProxyType[str, str]:

        ...

    def has_coroutine_func(self) -> bool:

        ...

    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...

class IsPlainFuncArgJobBase(Protocol):

    """"""

    _job_func: Callable

    def _accept_call_func_decorator(self, call_func_decorator: GeneralDecorator) -> None:

        ...

class IsFuncArgJob(IsFuncArgJobBase, Protocol[JobT]):

    """"""

    def revise(self) -> JobT:

        ...

class IsFuncArgJobTemplateCallable(Protocol[JobTemplateT]):

    """"""

    def __call__(

        self,

        name: str | None = None,

        iterate_over_data_files: bool = False,

        persist_outputs: PersistOutputsOptions | None = None,

        restore_outputs: RestoreOutputsOptions | None = None,

        result_key: str | None = None,

        fixed_params: Mapping[str, object] | None = None,

        param_key_map: Mapping[str, str] | None = None,

        **kwargs: object,

    ) -> Callable[[Callable], JobTemplateT]:

        ...

class IsFuncArgJobTemplate(IsJobTemplate, IsFuncArgJobBase, Protocol[JobTemplateT, JobT]):

    """"""

    def refine(self,

               *args: Any,

               update: bool = True,

               name: str | None = None,

               iterate_over_data_files: bool = False,

               persist_outputs: PersistOutputsOptions | None = None,

               restore_outputs: RestoreOutputsOptions | None = None,

               result_key: str | None = None,

               fixed_params: Mapping[str, object] | None = None,

               param_key_map: Mapping[str, str] | None = None,

               **kwargs: object) -> JobTemplateT:

        ...

    def apply(self) -> JobT:

        ...

class IsTaskTemplateArgsJobBase(IsFuncArgJobBase, Protocol[TaskTemplateCovT]):

    """"""

    @property

    def task_templates(self) -> tuple[TaskTemplateCovT, ...]:

        ...

class IsTaskTemplateArgsJob(IsTaskTemplateArgsJobBase[TaskTemplateCovT],

                            IsFuncArgJob[JobT],

                            Protocol[TaskTemplateCovT, JobT]):

    """"""

class IsTaskTemplateArgsJobTemplateCallable(Protocol[TaskTemplateContraT, JobTemplateT]):

    """"""

    def __call__(

        self,

        *task_templates: TaskTemplateContraT,

        name: str | None = None,

        iterate_over_data_files: bool = False,

        persist_outputs: PersistOutputsOptions | None = None,

        restore_outputs: RestoreOutputsOptions | None = None,

        result_key: str | None = None,

        fixed_params: Mapping[str, object] | None = None,

        param_key_map: Mapping[str, str] | None = None,

        **kwargs: object,

    ) -> Callable[[Callable], JobTemplateT]:

        ...

class IsTaskTemplateArgsJobTemplate(IsFuncArgJobTemplate[JobTemplateT, JobT],

                                    IsTaskTemplateArgsJobBase[TaskTemplateT],

                                    Protocol[TaskTemplateT, JobTemplateT, JobT]):

    """"""

    def refine(self,

               *task_templates: TaskTemplateT,

               update: bool = True,

               name: str | None = None,

               iterate_over_data_files: bool = False,

               fixed_params: Mapping[str, object] | None = None,

               param_key_map: Mapping[str, str] | None = None,

               result_key: str | None = None,

               persist_outputs: PersistOutputsOptions | None = None,

               restore_outputs: RestoreOutputsOptions | None = None,

               **kwargs: object) -> JobTemplateT:

        ...

Classes

IsFuncArgJob

class IsFuncArgJob(
    *args,
    **kwargs
)
View Source
class IsFuncArgJob(IsFuncArgJobBase, Protocol[JobT]):

    """"""

    def revise(self) -> JobT:

        ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJob
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
revise
def revise(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def revise(self) -> JobT:

        ...

IsFuncArgJobBase

class IsFuncArgJobBase(
    *args,
    **kwargs
)
View Source
class IsFuncArgJobBase(IsJob, Protocol):

    """"""

    @property

    def param_signatures(self) -> MappingProxyType:

        ...

    @property

    def return_type(self) -> Type[object]:

        ...

    @property

    def iterate_over_data_files(self) -> bool:

        ...

    @property

    def persist_outputs(self) -> PersistOutputsOptions | None:

        ...

    @property

    def restore_outputs(self) -> RestoreOutputsOptions | None:

        ...

    @property

    def output_storage_protocol(self) -> OutputStorageProtocolOptions | None:

        ...

    @property

    def will_persist_outputs(self) -> PersistOutputsOptions:

        ...

    @property

    def will_restore_outputs(self) -> RestoreOutputsOptions:

        ...

    @property

    def output_storage_protocol_to_use(self) -> OutputStorageProtocolOptions:

        ...

    @property

    def result_key(self) -> str | None:

        ...

    @property

    def fixed_params(self) -> MappingProxyType[str, object]:

        ...

    @property

    def param_key_map(self) -> MappingProxyType[str, str]:

        ...

    def has_coroutine_func(self) -> bool:

        ...

    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJob
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...

IsFuncArgJobTemplate

class IsFuncArgJobTemplate(
    *args,
    **kwargs
)
View Source
class IsFuncArgJobTemplate(IsJobTemplate, IsFuncArgJobBase, Protocol[JobTemplateT, JobT]):

    """"""

    def refine(self,

               *args: Any,

               update: bool = True,

               name: str | None = None,

               iterate_over_data_files: bool = False,

               persist_outputs: PersistOutputsOptions | None = None,

               restore_outputs: RestoreOutputsOptions | None = None,

               result_key: str | None = None,

               fixed_params: Mapping[str, object] | None = None,

               param_key_map: Mapping[str, str] | None = None,

               **kwargs: object) -> JobTemplateT:

        ...

    def apply(self) -> JobT:

        ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJob
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...
create_job_template
def create_job_template(
    *args: object,
    **kwargs: object
) -> IsJobTemplate

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJobTemplate
View Source
    @classmethod

    def create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
apply
def apply(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def apply(self) -> JobT:

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
refine
def refine(
    self,
    *args: Any,
    update: bool = True,
    name: str | None = None,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    **kwargs: object
) -> +JobTemplateT

Parameters:

Name Type Description Default
args Any
update bool True
name str None
iterate_over_data_files bool
persist_outputs omnipy.api.enums.PersistOutputsOptions None
restore_outputs omnipy.api.enums.RestoreOutputsOptions None
result_key str None
fixed_params Optional[Mapping[str, object]]
param_key_map Optional[Mapping[str, str]]
kwargs object

Returns:

Type Description
+JobTemplateT
View Source
    def refine(self,

               *args: Any,

               update: bool = True,

               name: str | None = None,

               iterate_over_data_files: bool = False,

               persist_outputs: PersistOutputsOptions | None = None,

               restore_outputs: RestoreOutputsOptions | None = None,

               result_key: str | None = None,

               fixed_params: Mapping[str, object] | None = None,

               param_key_map: Mapping[str, str] | None = None,

               **kwargs: object) -> JobTemplateT:

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
run
def run(
    self,
    *args: object,
    **kwargs: object
) -> object

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def run(self, *args: object, **kwargs: object) -> object:

        ...

IsFuncArgJobTemplateCallable

class IsFuncArgJobTemplateCallable(
    *args,
    **kwargs
)
View Source
class IsFuncArgJobTemplateCallable(Protocol[JobTemplateT]):

    """"""

    def __call__(

        self,

        name: str | None = None,

        iterate_over_data_files: bool = False,

        persist_outputs: PersistOutputsOptions | None = None,

        restore_outputs: RestoreOutputsOptions | None = None,

        result_key: str | None = None,

        fixed_params: Mapping[str, object] | None = None,

        param_key_map: Mapping[str, str] | None = None,

        **kwargs: object,

    ) -> Callable[[Callable], JobTemplateT]:

        ...

Methods

call
def __call__(
    self,
    name: str | None = None,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    **kwargs: object
) -> Callable[[Callable], +JobTemplateT]

Call self as a function.

Parameters:

Name Type Description Default
name str None
iterate_over_data_files bool
persist_outputs omnipy.api.enums.PersistOutputsOptions None
restore_outputs omnipy.api.enums.RestoreOutputsOptions None
result_key str None
fixed_params Optional[Mapping[str, object]]
param_key_map Optional[Mapping[str, str]]
kwargs object

Returns:

Type Description
Callable[[Callable], +JobTemplateT]
View Source
    def __call__(

        self,

        name: str | None = None,

        iterate_over_data_files: bool = False,

        persist_outputs: PersistOutputsOptions | None = None,

        restore_outputs: RestoreOutputsOptions | None = None,

        result_key: str | None = None,

        fixed_params: Mapping[str, object] | None = None,

        param_key_map: Mapping[str, str] | None = None,

        **kwargs: object,

    ) -> Callable[[Callable], JobTemplateT]:

        ...

IsJob

class IsJob(
    *args,
    **kwargs
)
View Source
class IsJob(IsJobBase, Protocol):

    """"""

    @property

    def time_of_cur_toplevel_flow_run(self) -> datetime | None:

        ...

    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

    def __call__(self, *args: object, **kwargs: object) -> object:

        ...

    def _apply_engine_decorator(self, engine: IsEngine) -> None:

        ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJob
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

Instance variables

config
engine
in_flow_context
logger
name
time_of_cur_toplevel_flow_run
unique_name

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...

IsJobBase

class IsJobBase(
    *args,
    **kwargs
)
View Source
class IsJobBase(CanLog, IsUniquelyNamedJob, Protocol):

    """"""

    @property

    def _job_creator(self) -> IsJobCreator:

        ...

    @property

    def config(self) -> IsJobConfig | None:

        ...

    @property

    def engine(self) -> IsEngine | None:

        ...

    @property

    def in_flow_context(self) -> bool:

        ...

    def __eq__(self, other: object):

        ...

    @classmethod

    def _create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

    @classmethod

    def _create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

    def _apply(self) -> 'IsJob':

        ...

    def _refine(self, *args: Any, update: bool = True, **kwargs: object) -> 'IsJobTemplate':

        ...

    def _revise(self) -> 'IsJobTemplate':

        ...

    def _call_job_template(self, *args: object, **kwargs: object) -> object:

        ...

    def _call_job(self, *args: object, **kwargs: object) -> object:

        ...

Instance variables

config
engine
in_flow_context
logger
name
unique_name

Methods

eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...

IsJobTemplate

class IsJobTemplate(
    *args,
    **kwargs
)
View Source
class IsJobTemplate(IsJobBase, Protocol):

    """"""

    @classmethod

    def create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

    def run(self, *args: object, **kwargs: object) -> object:

        ...

Static methods

create_job_template
def create_job_template(
    *args: object,
    **kwargs: object
) -> IsJobTemplate

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJobTemplate
View Source
    @classmethod

    def create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

Instance variables

config
engine
in_flow_context
logger
name
unique_name

Methods

eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
run
def run(
    self,
    *args: object,
    **kwargs: object
) -> object

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def run(self, *args: object, **kwargs: object) -> object:

        ...

IsPlainFuncArgJobBase

class IsPlainFuncArgJobBase(
    *args,
    **kwargs
)
View Source
class IsPlainFuncArgJobBase(Protocol):

    """"""

    _job_func: Callable

    def _accept_call_func_decorator(self, call_func_decorator: GeneralDecorator) -> None:

        ...

IsTaskTemplateArgsJob

class IsTaskTemplateArgsJob(
    *args,
    **kwargs
)
View Source
class IsTaskTemplateArgsJob(IsTaskTemplateArgsJobBase[TaskTemplateCovT],

                            IsFuncArgJob[JobT],

                            Protocol[TaskTemplateCovT, JobT]):

    """"""

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJob
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
task_templates
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
revise
def revise(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def revise(self) -> JobT:

        ...

IsTaskTemplateArgsJobBase

class IsTaskTemplateArgsJobBase(
    *args,
    **kwargs
)
View Source
class IsTaskTemplateArgsJobBase(IsFuncArgJobBase, Protocol[TaskTemplateCovT]):

    """"""

    @property

    def task_templates(self) -> tuple[TaskTemplateCovT, ...]:

        ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJob
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
task_templates
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...

IsTaskTemplateArgsJobTemplate

class IsTaskTemplateArgsJobTemplate(
    *args,
    **kwargs
)
View Source
class IsTaskTemplateArgsJobTemplate(IsFuncArgJobTemplate[JobTemplateT, JobT],

                                    IsTaskTemplateArgsJobBase[TaskTemplateT],

                                    Protocol[TaskTemplateT, JobTemplateT, JobT]):

    """"""

    def refine(self,

               *task_templates: TaskTemplateT,

               update: bool = True,

               name: str | None = None,

               iterate_over_data_files: bool = False,

               fixed_params: Mapping[str, object] | None = None,

               param_key_map: Mapping[str, str] | None = None,

               result_key: str | None = None,

               persist_outputs: PersistOutputsOptions | None = None,

               restore_outputs: RestoreOutputsOptions | None = None,

               **kwargs: object) -> JobTemplateT:

        ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJob
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...
create_job_template
def create_job_template(
    *args: object,
    **kwargs: object
) -> IsJobTemplate

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJobTemplate
View Source
    @classmethod

    def create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
task_templates
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
apply
def apply(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def apply(self) -> JobT:

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
refine
def refine(
    self,
    *task_templates: ~TaskTemplateT,
    update: bool = True,
    name: str | None = None,
    iterate_over_data_files: bool = False,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    result_key: str | None = None,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    **kwargs: object
) -> +JobTemplateT

Parameters:

Name Type Description Default
task_templates ~TaskTemplateT
update bool True
name str None
iterate_over_data_files bool
fixed_params Optional[Mapping[str, object]]
param_key_map Optional[Mapping[str, str]]
result_key str None
persist_outputs omnipy.api.enums.PersistOutputsOptions None
restore_outputs omnipy.api.enums.RestoreOutputsOptions None
kwargs object

Returns:

Type Description
+JobTemplateT
View Source
    def refine(self,

               *task_templates: TaskTemplateT,

               update: bool = True,

               name: str | None = None,

               iterate_over_data_files: bool = False,

               fixed_params: Mapping[str, object] | None = None,

               param_key_map: Mapping[str, str] | None = None,

               result_key: str | None = None,

               persist_outputs: PersistOutputsOptions | None = None,

               restore_outputs: RestoreOutputsOptions | None = None,

               **kwargs: object) -> JobTemplateT:

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
run
def run(
    self,
    *args: object,
    **kwargs: object
) -> object

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def run(self, *args: object, **kwargs: object) -> object:

        ...

IsTaskTemplateArgsJobTemplateCallable

class IsTaskTemplateArgsJobTemplateCallable(
    *args,
    **kwargs
)
View Source
class IsTaskTemplateArgsJobTemplateCallable(Protocol[TaskTemplateContraT, JobTemplateT]):

    """"""

    def __call__(

        self,

        *task_templates: TaskTemplateContraT,

        name: str | None = None,

        iterate_over_data_files: bool = False,

        persist_outputs: PersistOutputsOptions | None = None,

        restore_outputs: RestoreOutputsOptions | None = None,

        result_key: str | None = None,

        fixed_params: Mapping[str, object] | None = None,

        param_key_map: Mapping[str, str] | None = None,

        **kwargs: object,

    ) -> Callable[[Callable], JobTemplateT]:

        ...

Methods

call
def __call__(
    self,
    *task_templates: -TaskTemplateContraT,
    name: str | None = None,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    **kwargs: object
) -> Callable[[Callable], +JobTemplateT]

Call self as a function.

Parameters:

Name Type Description Default
task_templates -TaskTemplateContraT
name str None
iterate_over_data_files bool
persist_outputs omnipy.api.enums.PersistOutputsOptions None
restore_outputs omnipy.api.enums.RestoreOutputsOptions None
result_key str None
fixed_params Optional[Mapping[str, object]]
param_key_map Optional[Mapping[str, str]]
kwargs object

Returns:

Type Description
Callable[[Callable], +JobTemplateT]
View Source
    def __call__(

        self,

        *task_templates: TaskTemplateContraT,

        name: str | None = None,

        iterate_over_data_files: bool = False,

        persist_outputs: PersistOutputsOptions | None = None,

        restore_outputs: RestoreOutputsOptions | None = None,

        result_key: str | None = None,

        fixed_params: Mapping[str, object] | None = None,

        param_key_map: Mapping[str, str] | None = None,

        **kwargs: object,

    ) -> Callable[[Callable], JobTemplateT]:

        ...