Skip to content

Module omnipy.api.protocols.public.compute

Overview

View Source
from datetime import datetime

from typing import Protocol

from omnipy.api.protocols.private.compute.job import (IsFuncArgJob,

                                                      IsFuncArgJobTemplate,

                                                      IsTaskTemplateArgsJob,

                                                      IsTaskTemplateArgsJobTemplate)

from omnipy.api.protocols.private.compute.mixins import IsNestedContext

class IsTaskTemplate(IsFuncArgJobTemplate['IsTaskTemplate', 'IsTask'], Protocol):

    """

    Loosely coupled type replacement for the :py:class:`~omnipy.compute.task.TaskTemplate` class

    """

    ...

class IsTask(IsFuncArgJob[IsTaskTemplate], Protocol):

    """"""

    ...

class IsFlowTemplate(Protocol):

    """"""

    ...

class IsFlow(Protocol):

    """"""

    @property

    def flow_context(self) -> IsNestedContext:

        ...

    @property

    def time_of_last_run(self) -> datetime | None:

        ...

class IsLinearFlowTemplate(IsTaskTemplateArgsJobTemplate[IsTaskTemplate,

                                                         'IsLinearFlowTemplate',

                                                         'IsLinearFlow'],

                           IsFlowTemplate,

                           Protocol):

    """"""

    ...

class IsLinearFlow(IsTaskTemplateArgsJob[IsTaskTemplate, IsLinearFlowTemplate], IsFlow, Protocol):

    """"""

    ...

class IsDagFlowTemplate(IsTaskTemplateArgsJobTemplate[IsTaskTemplate,

                                                      'IsDagFlowTemplate',

                                                      'IsDagFlow'],

                        IsFlowTemplate,

                        Protocol):

    """"""

    ...

class IsDagFlow(IsTaskTemplateArgsJob[IsTaskTemplate, IsDagFlowTemplate], IsFlow, Protocol):

    """"""

    ...

class IsFuncFlowTemplate(IsFuncArgJobTemplate['IsFuncFlowTemplate', 'IsFuncFlow'],

                         IsFlowTemplate,

                         Protocol):

    """"""

    ...

class IsFuncFlow(IsFuncArgJob[IsFuncFlowTemplate], Protocol):

    """"""

    ...

Classes

IsDagFlow

class IsDagFlow(
    *args,
    **kwargs
)
View Source
class IsDagFlow(IsTaskTemplateArgsJob[IsTaskTemplate, IsDagFlowTemplate], IsFlow, Protocol):

    """"""

    ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJob'
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

Instance variables

config
engine
fixed_params
flow_context
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
task_templates
time_of_cur_toplevel_flow_run
time_of_last_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
revise
def revise(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def revise(self) -> JobT:

        ...

IsDagFlowTemplate

class IsDagFlowTemplate(
    *args,
    **kwargs
)
View Source
class IsDagFlowTemplate(IsTaskTemplateArgsJobTemplate[IsTaskTemplate,

                                                      'IsDagFlowTemplate',

                                                      'IsDagFlow'],

                        IsFlowTemplate,

                        Protocol):

    """"""

    ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJob'
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...
create_job_template
def create_job_template(
    *args: object,
    **kwargs: object
) -> IsJobTemplate

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJobTemplate'
View Source
    @classmethod

    def create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
task_templates
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
apply
def apply(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def apply(self) -> JobT:

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
refine
def refine(
    self,
    *task_templates: ~TaskTemplateT,
    update: bool = True,
    name: str | None = None,
    iterate_over_data_files: bool = False,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    result_key: str | None = None,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    **kwargs: object
) -> +JobTemplateT

Parameters:

Name Type Description Default
task_templates ~TaskTemplateT
update bool True
name str None
iterate_over_data_files bool
fixed_params Optional[Mapping[str, object]]
param_key_map Optional[Mapping[str, str]]
result_key str None
persist_outputs omnipy.api.enums.PersistOutputsOptions None
restore_outputs omnipy.api.enums.RestoreOutputsOptions None
kwargs object

Returns:

Type Description
+JobTemplateT
View Source
    def refine(self,

               *task_templates: TaskTemplateT,

               update: bool = True,

               name: str | None = None,

               iterate_over_data_files: bool = False,

               fixed_params: Mapping[str, object] | None = None,

               param_key_map: Mapping[str, str] | None = None,

               result_key: str | None = None,

               persist_outputs: PersistOutputsOptions | None = None,

               restore_outputs: RestoreOutputsOptions | None = None,

               **kwargs: object) -> JobTemplateT:

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
run
def run(
    self,
    *args: object,
    **kwargs: object
) -> object

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def run(self, *args: object, **kwargs: object) -> object:

        ...

IsFlow

class IsFlow(
    *args,
    **kwargs
)
View Source
class IsFlow(Protocol):

    """"""

    @property

    def flow_context(self) -> IsNestedContext:

        ...

    @property

    def time_of_last_run(self) -> datetime | None:

        ...

Instance variables

flow_context
time_of_last_run

IsFlowTemplate

class IsFlowTemplate(
    *args,
    **kwargs
)
View Source
class IsFlowTemplate(Protocol):

    """"""

    ...

IsFuncFlow

class IsFuncFlow(
    *args,
    **kwargs
)
View Source
class IsFuncFlow(IsFuncArgJob[IsFuncFlowTemplate], Protocol):

    """"""

    ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJob'
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
revise
def revise(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def revise(self) -> JobT:

        ...

IsFuncFlowTemplate

class IsFuncFlowTemplate(
    *args,
    **kwargs
)
View Source
class IsFuncFlowTemplate(IsFuncArgJobTemplate['IsFuncFlowTemplate', 'IsFuncFlow'],

                         IsFlowTemplate,

                         Protocol):

    """"""

    ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJob'
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...
create_job_template
def create_job_template(
    *args: object,
    **kwargs: object
) -> IsJobTemplate

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJobTemplate'
View Source
    @classmethod

    def create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
apply
def apply(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def apply(self) -> JobT:

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
refine
def refine(
    self,
    *args: Any,
    update: bool = True,
    name: str | None = None,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    **kwargs: object
) -> +JobTemplateT

Parameters:

Name Type Description Default
args Any
update bool True
name str None
iterate_over_data_files bool
persist_outputs omnipy.api.enums.PersistOutputsOptions None
restore_outputs omnipy.api.enums.RestoreOutputsOptions None
result_key str None
fixed_params Optional[Mapping[str, object]]
param_key_map Optional[Mapping[str, str]]
kwargs object

Returns:

Type Description
+JobTemplateT
View Source
    def refine(self,

               *args: Any,

               update: bool = True,

               name: str | None = None,

               iterate_over_data_files: bool = False,

               persist_outputs: PersistOutputsOptions | None = None,

               restore_outputs: RestoreOutputsOptions | None = None,

               result_key: str | None = None,

               fixed_params: Mapping[str, object] | None = None,

               param_key_map: Mapping[str, str] | None = None,

               **kwargs: object) -> JobTemplateT:

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
run
def run(
    self,
    *args: object,
    **kwargs: object
) -> object

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def run(self, *args: object, **kwargs: object) -> object:

        ...

IsLinearFlow

class IsLinearFlow(
    *args,
    **kwargs
)
View Source
class IsLinearFlow(IsTaskTemplateArgsJob[IsTaskTemplate, IsLinearFlowTemplate], IsFlow, Protocol):

    """"""

    ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJob'
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

Instance variables

config
engine
fixed_params
flow_context
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
task_templates
time_of_cur_toplevel_flow_run
time_of_last_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
revise
def revise(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def revise(self) -> JobT:

        ...

IsLinearFlowTemplate

class IsLinearFlowTemplate(
    *args,
    **kwargs
)
View Source
class IsLinearFlowTemplate(IsTaskTemplateArgsJobTemplate[IsTaskTemplate,

                                                         'IsLinearFlowTemplate',

                                                         'IsLinearFlow'],

                           IsFlowTemplate,

                           Protocol):

    """"""

    ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJob'
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...
create_job_template
def create_job_template(
    *args: object,
    **kwargs: object
) -> IsJobTemplate

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJobTemplate'
View Source
    @classmethod

    def create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
task_templates
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
apply
def apply(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def apply(self) -> JobT:

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
refine
def refine(
    self,
    *task_templates: ~TaskTemplateT,
    update: bool = True,
    name: str | None = None,
    iterate_over_data_files: bool = False,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    result_key: str | None = None,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    **kwargs: object
) -> +JobTemplateT

Parameters:

Name Type Description Default
task_templates ~TaskTemplateT
update bool True
name str None
iterate_over_data_files bool
fixed_params Optional[Mapping[str, object]]
param_key_map Optional[Mapping[str, str]]
result_key str None
persist_outputs omnipy.api.enums.PersistOutputsOptions None
restore_outputs omnipy.api.enums.RestoreOutputsOptions None
kwargs object

Returns:

Type Description
+JobTemplateT
View Source
    def refine(self,

               *task_templates: TaskTemplateT,

               update: bool = True,

               name: str | None = None,

               iterate_over_data_files: bool = False,

               fixed_params: Mapping[str, object] | None = None,

               param_key_map: Mapping[str, str] | None = None,

               result_key: str | None = None,

               persist_outputs: PersistOutputsOptions | None = None,

               restore_outputs: RestoreOutputsOptions | None = None,

               **kwargs: object) -> JobTemplateT:

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
run
def run(
    self,
    *args: object,
    **kwargs: object
) -> object

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def run(self, *args: object, **kwargs: object) -> object:

        ...

IsTask

class IsTask(
    *args,
    **kwargs
)
View Source
class IsTask(IsFuncArgJob[IsTaskTemplate], Protocol):

    """"""

    ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJob'
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
revise
def revise(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def revise(self) -> JobT:

        ...

IsTaskTemplate

class IsTaskTemplate(
    *args,
    **kwargs
)

Loosely coupled type replacement for the :py:class:~omnipy.compute.task.TaskTemplate class

View Source
class IsTaskTemplate(IsFuncArgJobTemplate['IsTaskTemplate', 'IsTask'], Protocol):

    """

    Loosely coupled type replacement for the :py:class:`~omnipy.compute.task.TaskTemplate` class

    """

    ...

Static methods

create_job
def create_job(
    *args: object,
    **kwargs: object
) -> IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJob'
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> 'IsJob':

        ...
create_job_template
def create_job_template(
    *args: object,
    **kwargs: object
) -> IsJobTemplate

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
'IsJobTemplate'
View Source
    @classmethod

    def create_job_template(cls, *args: object, **kwargs: object) -> 'IsJobTemplate':

        ...

Instance variables

config
engine
fixed_params
in_flow_context
iterate_over_data_files
logger
name
output_storage_protocol
output_storage_protocol_to_use
param_key_map
param_signatures
persist_outputs
restore_outputs
result_key
return_type
time_of_cur_toplevel_flow_run
unique_name
will_persist_outputs
will_restore_outputs

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        ...
eq
def __eq__(
    self,
    other: object
)

Return self==value.

Parameters:

Name Type Description Default
other object
View Source
    def __eq__(self, other: object):

        ...
apply
def apply(
    self
) -> +JobT

Returns:

Type Description
+JobT
View Source
    def apply(self) -> JobT:

        ...
get_call_args
def get_call_args(
    self,
    *args: object,
    **kwargs: object
) -> dict[str, object]

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
dict[str, object]
View Source
    def get_call_args(self, *args: object, **kwargs: object) -> dict[str, object]:

        ...
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        ...
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        ...
refine
def refine(
    self,
    *args: Any,
    update: bool = True,
    name: str | None = None,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    **kwargs: object
) -> +JobTemplateT

Parameters:

Name Type Description Default
args Any
update bool True
name str None
iterate_over_data_files bool
persist_outputs omnipy.api.enums.PersistOutputsOptions None
restore_outputs omnipy.api.enums.RestoreOutputsOptions None
result_key str None
fixed_params Optional[Mapping[str, object]]
param_key_map Optional[Mapping[str, str]]
kwargs object

Returns:

Type Description
+JobTemplateT
View Source
    def refine(self,

               *args: Any,

               update: bool = True,

               name: str | None = None,

               iterate_over_data_files: bool = False,

               persist_outputs: PersistOutputsOptions | None = None,

               restore_outputs: RestoreOutputsOptions | None = None,

               result_key: str | None = None,

               fixed_params: Mapping[str, object] | None = None,

               param_key_map: Mapping[str, str] | None = None,

               **kwargs: object) -> JobTemplateT:

        ...
regenerate_unique_name
def regenerate_unique_name(
    self
) -> None

Returns:

Type Description
NoneType
View Source
    def regenerate_unique_name(self) -> None:

        ...
run
def run(
    self,
    *args: object,
    **kwargs: object
) -> object

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def run(self, *args: object, **kwargs: object) -> object:

        ...