Skip to content

Module omnipy.compute.flow

Overview

View Source
from typing import cast, Type

from omnipy.api.protocols.private.compute.job import (IsFuncArgJobTemplateCallable,

                                                      IsJob,

                                                      IsJobTemplate,

                                                      IsTaskTemplateArgsJobTemplateCallable)

from omnipy.api.protocols.private.engine import IsEngine

from omnipy.api.protocols.public.compute import (IsDagFlow,

                                                 IsDagFlowTemplate,

                                                 IsFuncFlow,

                                                 IsFuncFlowTemplate,

                                                 IsLinearFlow,

                                                 IsLinearFlowTemplate,

                                                 IsTaskTemplate)

from omnipy.api.protocols.public.engine import (IsDagFlowRunnerEngine,

                                                IsFuncFlowRunnerEngine,

                                                IsLinearFlowRunnerEngine)

from omnipy.compute.func_job import FuncArgJobBase

from omnipy.compute.job import JobMixin, JobTemplateMixin

from omnipy.compute.mixins.flow_context import FlowContextJobMixin

from omnipy.compute.tasklist_job import TaskTemplateArgsJobBase

from omnipy.util.callable_decorator import callable_decorator_cls

class FlowBase:

    ...

def linear_flow_template_callable_decorator_cls(

    cls: Type['LinearFlowTemplate']

) -> IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate]:

    return cast(IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate],

                callable_decorator_cls(cls))

@linear_flow_template_callable_decorator_cls

class LinearFlowTemplate(JobTemplateMixin, FlowBase, TaskTemplateArgsJobBase):

    @classmethod

    def _get_job_subcls_for_apply(cls) -> Type[IsJob]:

        return cast(Type[IsLinearFlow], LinearFlow)

class LinearFlow(JobMixin, FlowBase, TaskTemplateArgsJobBase):

    def _apply_engine_decorator(self, engine: IsEngine) -> None:

        if self.engine:

            engine = cast(IsLinearFlowRunnerEngine, self.engine)

            self_with_mixins = cast(IsLinearFlow, self)

            engine.apply_linear_flow_decorator(self_with_mixins, self._accept_call_func_decorator)

    @classmethod

    def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:

        return cast(Type[IsLinearFlowTemplate], LinearFlowTemplate)

def dag_flow_template_callable_decorator_cls(

    cls: Type['DagFlowTemplate']

) -> IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate]:

    return cast(IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate],

                callable_decorator_cls(cls))

@dag_flow_template_callable_decorator_cls

class DagFlowTemplate(JobTemplateMixin, FlowBase, TaskTemplateArgsJobBase):

    @classmethod

    def _get_job_subcls_for_apply(cls) -> Type[IsJob]:

        return cast(Type[IsDagFlow], DagFlow)

class DagFlow(JobMixin, FlowBase, TaskTemplateArgsJobBase):

    def _apply_engine_decorator(self, engine: IsEngine) -> None:

        if self.engine:

            engine = cast(IsDagFlowRunnerEngine, self.engine)

            self_with_mixins = cast(IsDagFlow, self)

            engine.apply_dag_flow_decorator(self_with_mixins, self._accept_call_func_decorator)

    @classmethod

    def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:

        return cast(Type[IsDagFlowTemplate], DagFlowTemplate)

def func_flow_template_callable_decorator_cls(

        cls: Type['FuncFlowTemplate']) -> IsFuncArgJobTemplateCallable[IsFuncFlowTemplate]:

    return cast(IsFuncArgJobTemplateCallable[IsFuncFlowTemplate], callable_decorator_cls(cls))

@func_flow_template_callable_decorator_cls

class FuncFlowTemplate(JobTemplateMixin, FlowBase, FuncArgJobBase):

    @classmethod

    def _get_job_subcls_for_apply(cls) -> Type[IsJob]:

        return cast(Type[IsFuncFlow], FuncFlow)

class FuncFlow(JobMixin, FlowBase, FuncArgJobBase):

    def _apply_engine_decorator(self, engine: IsEngine) -> None:

        if self.engine:

            engine = cast(IsFuncFlowRunnerEngine, self.engine)

            self_with_mixins = cast(IsFuncFlow, self)

            engine.apply_func_flow_decorator(self_with_mixins, self._accept_call_func_decorator)

    @classmethod

    def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:

        return cast(Type[IsFuncFlowTemplate], FuncFlowTemplate)

LinearFlow.accept_mixin(FlowContextJobMixin)

DagFlow.accept_mixin(FlowContextJobMixin)

FuncFlow.accept_mixin(FlowContextJobMixin)

# TODO: Recursive replace - *args: Any -> *args: object, *kwargs: Any -> *kwargs: object

Functions

dag_flow_template_callable_decorator_cls

def dag_flow_template_callable_decorator_cls(
    cls: Type[ForwardRef('DagFlowTemplate')]
) -> omnipy.api.protocols.private.compute.job.IsTaskTemplateArgsJobTemplateCallable[omnipy.api.protocols.public.compute.IsTaskTemplate, omnipy.api.protocols.public.compute.IsDagFlowTemplate]

Returns:

Type Description
IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate]
View Source
def dag_flow_template_callable_decorator_cls(

    cls: Type['DagFlowTemplate']

) -> IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate]:

    return cast(IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsDagFlowTemplate],

                callable_decorator_cls(cls))

func_flow_template_callable_decorator_cls

def func_flow_template_callable_decorator_cls(
    cls: Type[ForwardRef('FuncFlowTemplate')]
) -> omnipy.api.protocols.private.compute.job.IsFuncArgJobTemplateCallable[omnipy.api.protocols.public.compute.IsFuncFlowTemplate]

Returns:

Type Description
IsFuncArgJobTemplateCallable[IsFuncFlowTemplate]
View Source
def func_flow_template_callable_decorator_cls(

        cls: Type['FuncFlowTemplate']) -> IsFuncArgJobTemplateCallable[IsFuncFlowTemplate]:

    return cast(IsFuncArgJobTemplateCallable[IsFuncFlowTemplate], callable_decorator_cls(cls))

linear_flow_template_callable_decorator_cls

def linear_flow_template_callable_decorator_cls(
    cls: Type[ForwardRef('LinearFlowTemplate')]
) -> omnipy.api.protocols.private.compute.job.IsTaskTemplateArgsJobTemplateCallable[omnipy.api.protocols.public.compute.IsTaskTemplate, omnipy.api.protocols.public.compute.IsLinearFlowTemplate]

Returns:

Type Description
IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate]
View Source
def linear_flow_template_callable_decorator_cls(

    cls: Type['LinearFlowTemplate']

) -> IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate]:

    return cast(IsTaskTemplateArgsJobTemplateCallable[IsTaskTemplate, IsLinearFlowTemplate],

                callable_decorator_cls(cls))

Classes

DagFlow

class DagFlow(
    *args,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    name: str | None = None,
    **kwargs
)
View Source
class DagFlow(JobMixin, FlowBase, TaskTemplateArgsJobBase):

    def _apply_engine_decorator(self, engine: IsEngine) -> None:

        if self.engine:

            engine = cast(IsDagFlowRunnerEngine, self.engine)

            self_with_mixins = cast(IsDagFlow, self)

            engine.apply_dag_flow_decorator(self_with_mixins, self._accept_call_func_decorator)

    @classmethod

    def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:

        return cast(Type[IsDagFlowTemplate], DagFlowTemplate)

Class variables

WITH_MIXINS_CLS_PREFIX

Static methods

accept_mixin
def accept_mixin(
    mixin_cls: Type
) -> None

Parameters:

Name Type Description Default
mixin_cls Type

Returns:

Type Description
NoneType
View Source
    @classmethod

    def accept_mixin(cls, mixin_cls: Type) -> None:

        cls._accept_mixin(mixin_cls, update=True)
create_job
def create_job(
    *args: object,
    **kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJob
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> IsJob:

        cls_as_job_base = cast(IsJobBase, cls)

        return cls_as_job_base._create_job(*args, **kwargs)
reset_mixins
def reset_mixins(

)
View Source
    @classmethod

    def reset_mixins(cls):

        cls._mixin_classes.clear()

        cls._init_params_per_mixin_cls.clear()

        cls.__init__.__signature__ = cls._orig_init_signature

Instance variables

config
engine
in_flow_context
logger
task_templates
time_of_cur_toplevel_flow_run

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        self_as_job_base = cast(IsJobBase, self)

        try:

            return self_as_job_base._call_job(*args, **kwargs)

        except Exception as e:

            self_as_job_base.log(str(e), level=logging.ERROR)

            raise
eq
def __eq__(
    self,
    other: object
) -> bool

Return self==value.

Parameters:

Name Type Description Default
other object

Returns:

Type Description
bool
View Source
    def __eq__(self, other: object) -> bool:

        if not isinstance(other, JobBase):

            return NotImplemented

        return self._get_init_args() == other._get_init_args() \

            and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        return asyncio.iscoroutinefunction(self._job_func)
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        if self._logger is not None:

            create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()

            _former_log_record_factory = logging.getLogRecordFactory()

            if _former_log_record_factory.__name__ != '_log_record_editor':

                def _log_record_editor(*args, **kwargs):

                    record = _former_log_record_factory(*args, **kwargs)

                    record.created = create_time

                    record.engine = f"[{record.name.split('.')[0].upper()}]"

                    if len(record.engine) < 9:

                        record.engine += ' '

                    return record

                logging.setLogRecordFactory(_log_record_editor)

            self._logger.log(level, log_msg)
revise
def revise(
    self
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate

Returns:

Type Description
IsJobTemplate
View Source
    def revise(self) -> IsJobTemplate:

        self_as_job_base = cast(IsJobBase, self)

        job_template = self_as_job_base._revise()

        update_wrapper(job_template, self, updated=[])

        return job_template

DagFlowTemplate

class DagFlowTemplate(
    *args,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    name: str | None = None,
    **kwargs
)
View Source
@dag_flow_template_callable_decorator_cls

class DagFlowTemplate(JobTemplateMixin, FlowBase, TaskTemplateArgsJobBase):

    @classmethod

    def _get_job_subcls_for_apply(cls) -> Type[IsJob]:

        return cast(Type[IsDagFlow], DagFlow)

Class variables

WITH_MIXINS_CLS_PREFIX

Static methods

accept_mixin
def accept_mixin(
    mixin_cls: Type
) -> None

Parameters:

Name Type Description Default
mixin_cls Type

Returns:

Type Description
NoneType
View Source
    @classmethod

    def accept_mixin(cls, mixin_cls: Type) -> None:

        cls._accept_mixin(mixin_cls, update=True)
create_job_template
def create_job_template(
    *args: object,
    **kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJobTemplate
View Source
    @classmethod

    def create_job_template(cls: Type[IsJobBase], *args: object, **kwargs: object) -> IsJobTemplate:

        return cls._create_job_template(*args, **kwargs)
reset_mixins
def reset_mixins(

)
View Source
    @classmethod

    def reset_mixins(cls):

        cls._mixin_classes.clear()

        cls._init_params_per_mixin_cls.clear()

        cls.__init__.__signature__ = cls._orig_init_signature

Instance variables

config
engine
in_flow_context
logger
task_templates

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> Type[+DecoratorClassT]

call method at the class level which forward the call to instance-level call methods,

if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python only looks up special methods (with double underscores) at the class level, and not at the instance level. Used in the decoration process to forward call calls to the object level _obj_call() methods, if present.

See: https://stackoverflow.com/q/33824228

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
Type[+DecoratorClassT]
View Source
    def _forward_call_to_obj_if_callable(self, *args: object,

                                         **kwargs: object) -> Type[DecoratorClassT]:

        """

        __call__ method at the class level which forward the call to instance-level call methods,

        if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python

        only looks up special methods (with double underscores) at the class level, and not at the

        instance level. Used in the decoration process to forward __call__ calls to the object level

        _obj_call() methods, if present.

        See: https://stackoverflow.com/q/33824228

        """

        if hasattr(self, '_obj_call'):

            return self._obj_call(*args, **kwargs)

        if hasattr(self, '_wrapped_call'):

            return self._wrapped_call(*args, **kwargs)

        raise TypeError("'{}' object is not callable".format(self.__class__.__name__))
eq
def __eq__(
    self,
    other: object
) -> bool

Return self==value.

Parameters:

Name Type Description Default
other object

Returns:

Type Description
bool
View Source
    def __eq__(self, other: object) -> bool:

        if not isinstance(other, JobBase):

            return NotImplemented

        return self._get_init_args() == other._get_init_args() \

            and self._get_init_kwargs() == other._get_init_kwargs()
apply
def apply(
    self
) -> omnipy.api.protocols.private.compute.job.IsJob

Returns:

Type Description
IsJob
View Source
    def apply(self) -> IsJob:

        self_as_job_base = cast(IsJobBase, self)

        job = self_as_job_base._apply()

        update_wrapper(job, self, updated=[])

        return job
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        return asyncio.iscoroutinefunction(self._job_func)
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        if self._logger is not None:

            create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()

            _former_log_record_factory = logging.getLogRecordFactory()

            if _former_log_record_factory.__name__ != '_log_record_editor':

                def _log_record_editor(*args, **kwargs):

                    record = _former_log_record_factory(*args, **kwargs)

                    record.created = create_time

                    record.engine = f"[{record.name.split('.')[0].upper()}]"

                    if len(record.engine) < 9:

                        record.engine += ' '

                    return record

                logging.setLogRecordFactory(_log_record_editor)

            self._logger.log(level, log_msg)
refine
def refine(
    self,
    *args: Any,
    update: bool = True,
    **kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate

Parameters:

Name Type Description Default
args Any
update bool True
kwargs object

Returns:

Type Description
IsJobTemplate
View Source
    def refine(self, *args: Any, update: bool = True, **kwargs: object) -> IsJobTemplate:

        self_as_job_base = cast(IsJobBase, self)

        return self_as_job_base._refine(*args, update=update, **kwargs)
run
def run(
    self,
    *args: object,
    **kwargs: object
) -> object

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def run(self, *args: object, **kwargs: object) -> object:

        # TODO: Using JobTemplateMixin.run() inside flows should give error message

        return self.apply()(*args, **kwargs)

FlowBase

class FlowBase(
    /,
    *args,
    **kwargs
)
View Source
class FlowBase:

    ...

FuncFlow

class FuncFlow(
    *args,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    name: str | None = None,
    **kwargs
)
View Source
class FuncFlow(JobMixin, FlowBase, FuncArgJobBase):

    def _apply_engine_decorator(self, engine: IsEngine) -> None:

        if self.engine:

            engine = cast(IsFuncFlowRunnerEngine, self.engine)

            self_with_mixins = cast(IsFuncFlow, self)

            engine.apply_func_flow_decorator(self_with_mixins, self._accept_call_func_decorator)

    @classmethod

    def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:

        return cast(Type[IsFuncFlowTemplate], FuncFlowTemplate)

Class variables

WITH_MIXINS_CLS_PREFIX

Static methods

accept_mixin
def accept_mixin(
    mixin_cls: Type
) -> None

Parameters:

Name Type Description Default
mixin_cls Type

Returns:

Type Description
NoneType
View Source
    @classmethod

    def accept_mixin(cls, mixin_cls: Type) -> None:

        cls._accept_mixin(mixin_cls, update=True)
create_job
def create_job(
    *args: object,
    **kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJob
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> IsJob:

        cls_as_job_base = cast(IsJobBase, cls)

        return cls_as_job_base._create_job(*args, **kwargs)
reset_mixins
def reset_mixins(

)
View Source
    @classmethod

    def reset_mixins(cls):

        cls._mixin_classes.clear()

        cls._init_params_per_mixin_cls.clear()

        cls.__init__.__signature__ = cls._orig_init_signature

Instance variables

config
engine
in_flow_context
logger
time_of_cur_toplevel_flow_run

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        self_as_job_base = cast(IsJobBase, self)

        try:

            return self_as_job_base._call_job(*args, **kwargs)

        except Exception as e:

            self_as_job_base.log(str(e), level=logging.ERROR)

            raise
eq
def __eq__(
    self,
    other: object
) -> bool

Return self==value.

Parameters:

Name Type Description Default
other object

Returns:

Type Description
bool
View Source
    def __eq__(self, other: object) -> bool:

        if not isinstance(other, JobBase):

            return NotImplemented

        return self._get_init_args() == other._get_init_args() \

            and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        return asyncio.iscoroutinefunction(self._job_func)
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        if self._logger is not None:

            create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()

            _former_log_record_factory = logging.getLogRecordFactory()

            if _former_log_record_factory.__name__ != '_log_record_editor':

                def _log_record_editor(*args, **kwargs):

                    record = _former_log_record_factory(*args, **kwargs)

                    record.created = create_time

                    record.engine = f"[{record.name.split('.')[0].upper()}]"

                    if len(record.engine) < 9:

                        record.engine += ' '

                    return record

                logging.setLogRecordFactory(_log_record_editor)

            self._logger.log(level, log_msg)
revise
def revise(
    self
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate

Returns:

Type Description
IsJobTemplate
View Source
    def revise(self) -> IsJobTemplate:

        self_as_job_base = cast(IsJobBase, self)

        job_template = self_as_job_base._revise()

        update_wrapper(job_template, self, updated=[])

        return job_template

FuncFlowTemplate

class FuncFlowTemplate(
    *args,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    name: str | None = None,
    **kwargs
)
View Source
@func_flow_template_callable_decorator_cls

class FuncFlowTemplate(JobTemplateMixin, FlowBase, FuncArgJobBase):

    @classmethod

    def _get_job_subcls_for_apply(cls) -> Type[IsJob]:

        return cast(Type[IsFuncFlow], FuncFlow)

Class variables

WITH_MIXINS_CLS_PREFIX

Static methods

accept_mixin
def accept_mixin(
    mixin_cls: Type
) -> None

Parameters:

Name Type Description Default
mixin_cls Type

Returns:

Type Description
NoneType
View Source
    @classmethod

    def accept_mixin(cls, mixin_cls: Type) -> None:

        cls._accept_mixin(mixin_cls, update=True)
create_job_template
def create_job_template(
    *args: object,
    **kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJobTemplate
View Source
    @classmethod

    def create_job_template(cls: Type[IsJobBase], *args: object, **kwargs: object) -> IsJobTemplate:

        return cls._create_job_template(*args, **kwargs)
reset_mixins
def reset_mixins(

)
View Source
    @classmethod

    def reset_mixins(cls):

        cls._mixin_classes.clear()

        cls._init_params_per_mixin_cls.clear()

        cls.__init__.__signature__ = cls._orig_init_signature

Instance variables

config
engine
in_flow_context
logger

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> Type[+DecoratorClassT]

call method at the class level which forward the call to instance-level call methods,

if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python only looks up special methods (with double underscores) at the class level, and not at the instance level. Used in the decoration process to forward call calls to the object level _obj_call() methods, if present.

See: https://stackoverflow.com/q/33824228

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
Type[+DecoratorClassT]
View Source
    def _forward_call_to_obj_if_callable(self, *args: object,

                                         **kwargs: object) -> Type[DecoratorClassT]:

        """

        __call__ method at the class level which forward the call to instance-level call methods,

        if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python

        only looks up special methods (with double underscores) at the class level, and not at the

        instance level. Used in the decoration process to forward __call__ calls to the object level

        _obj_call() methods, if present.

        See: https://stackoverflow.com/q/33824228

        """

        if hasattr(self, '_obj_call'):

            return self._obj_call(*args, **kwargs)

        if hasattr(self, '_wrapped_call'):

            return self._wrapped_call(*args, **kwargs)

        raise TypeError("'{}' object is not callable".format(self.__class__.__name__))
eq
def __eq__(
    self,
    other: object
) -> bool

Return self==value.

Parameters:

Name Type Description Default
other object

Returns:

Type Description
bool
View Source
    def __eq__(self, other: object) -> bool:

        if not isinstance(other, JobBase):

            return NotImplemented

        return self._get_init_args() == other._get_init_args() \

            and self._get_init_kwargs() == other._get_init_kwargs()
apply
def apply(
    self
) -> omnipy.api.protocols.private.compute.job.IsJob

Returns:

Type Description
IsJob
View Source
    def apply(self) -> IsJob:

        self_as_job_base = cast(IsJobBase, self)

        job = self_as_job_base._apply()

        update_wrapper(job, self, updated=[])

        return job
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        return asyncio.iscoroutinefunction(self._job_func)
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        if self._logger is not None:

            create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()

            _former_log_record_factory = logging.getLogRecordFactory()

            if _former_log_record_factory.__name__ != '_log_record_editor':

                def _log_record_editor(*args, **kwargs):

                    record = _former_log_record_factory(*args, **kwargs)

                    record.created = create_time

                    record.engine = f"[{record.name.split('.')[0].upper()}]"

                    if len(record.engine) < 9:

                        record.engine += ' '

                    return record

                logging.setLogRecordFactory(_log_record_editor)

            self._logger.log(level, log_msg)
refine
def refine(
    self,
    *args: Any,
    update: bool = True,
    **kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate

Parameters:

Name Type Description Default
args Any
update bool True
kwargs object

Returns:

Type Description
IsJobTemplate
View Source
    def refine(self, *args: Any, update: bool = True, **kwargs: object) -> IsJobTemplate:

        self_as_job_base = cast(IsJobBase, self)

        return self_as_job_base._refine(*args, update=update, **kwargs)
run
def run(
    self,
    *args: object,
    **kwargs: object
) -> object

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def run(self, *args: object, **kwargs: object) -> object:

        # TODO: Using JobTemplateMixin.run() inside flows should give error message

        return self.apply()(*args, **kwargs)

LinearFlow

class LinearFlow(
    *args,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    name: str | None = None,
    **kwargs
)
View Source
class LinearFlow(JobMixin, FlowBase, TaskTemplateArgsJobBase):

    def _apply_engine_decorator(self, engine: IsEngine) -> None:

        if self.engine:

            engine = cast(IsLinearFlowRunnerEngine, self.engine)

            self_with_mixins = cast(IsLinearFlow, self)

            engine.apply_linear_flow_decorator(self_with_mixins, self._accept_call_func_decorator)

    @classmethod

    def _get_job_template_subcls_for_revise(cls) -> Type[IsJobTemplate]:

        return cast(Type[IsLinearFlowTemplate], LinearFlowTemplate)

Class variables

WITH_MIXINS_CLS_PREFIX

Static methods

accept_mixin
def accept_mixin(
    mixin_cls: Type
) -> None

Parameters:

Name Type Description Default
mixin_cls Type

Returns:

Type Description
NoneType
View Source
    @classmethod

    def accept_mixin(cls, mixin_cls: Type) -> None:

        cls._accept_mixin(mixin_cls, update=True)
create_job
def create_job(
    *args: object,
    **kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJob

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJob
View Source
    @classmethod

    def create_job(cls, *args: object, **kwargs: object) -> IsJob:

        cls_as_job_base = cast(IsJobBase, cls)

        return cls_as_job_base._create_job(*args, **kwargs)
reset_mixins
def reset_mixins(

)
View Source
    @classmethod

    def reset_mixins(cls):

        cls._mixin_classes.clear()

        cls._init_params_per_mixin_cls.clear()

        cls.__init__.__signature__ = cls._orig_init_signature

Instance variables

config
engine
in_flow_context
logger
task_templates
time_of_cur_toplevel_flow_run

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> object

Call self as a function.

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def __call__(self, *args: object, **kwargs: object) -> object:

        self_as_job_base = cast(IsJobBase, self)

        try:

            return self_as_job_base._call_job(*args, **kwargs)

        except Exception as e:

            self_as_job_base.log(str(e), level=logging.ERROR)

            raise
eq
def __eq__(
    self,
    other: object
) -> bool

Return self==value.

Parameters:

Name Type Description Default
other object

Returns:

Type Description
bool
View Source
    def __eq__(self, other: object) -> bool:

        if not isinstance(other, JobBase):

            return NotImplemented

        return self._get_init_args() == other._get_init_args() \

            and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        return asyncio.iscoroutinefunction(self._job_func)
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        if self._logger is not None:

            create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()

            _former_log_record_factory = logging.getLogRecordFactory()

            if _former_log_record_factory.__name__ != '_log_record_editor':

                def _log_record_editor(*args, **kwargs):

                    record = _former_log_record_factory(*args, **kwargs)

                    record.created = create_time

                    record.engine = f"[{record.name.split('.')[0].upper()}]"

                    if len(record.engine) < 9:

                        record.engine += ' '

                    return record

                logging.setLogRecordFactory(_log_record_editor)

            self._logger.log(level, log_msg)
revise
def revise(
    self
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate

Returns:

Type Description
IsJobTemplate
View Source
    def revise(self) -> IsJobTemplate:

        self_as_job_base = cast(IsJobBase, self)

        job_template = self_as_job_base._revise()

        update_wrapper(job_template, self, updated=[])

        return job_template

LinearFlowTemplate

class LinearFlowTemplate(
    *args,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    name: str | None = None,
    **kwargs
)
View Source
@linear_flow_template_callable_decorator_cls

class LinearFlowTemplate(JobTemplateMixin, FlowBase, TaskTemplateArgsJobBase):

    @classmethod

    def _get_job_subcls_for_apply(cls) -> Type[IsJob]:

        return cast(Type[IsLinearFlow], LinearFlow)

Class variables

WITH_MIXINS_CLS_PREFIX

Static methods

accept_mixin
def accept_mixin(
    mixin_cls: Type
) -> None

Parameters:

Name Type Description Default
mixin_cls Type

Returns:

Type Description
NoneType
View Source
    @classmethod

    def accept_mixin(cls, mixin_cls: Type) -> None:

        cls._accept_mixin(mixin_cls, update=True)
create_job_template
def create_job_template(
    *args: object,
    **kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
IsJobTemplate
View Source
    @classmethod

    def create_job_template(cls: Type[IsJobBase], *args: object, **kwargs: object) -> IsJobTemplate:

        return cls._create_job_template(*args, **kwargs)
reset_mixins
def reset_mixins(

)
View Source
    @classmethod

    def reset_mixins(cls):

        cls._mixin_classes.clear()

        cls._init_params_per_mixin_cls.clear()

        cls.__init__.__signature__ = cls._orig_init_signature

Instance variables

config
engine
in_flow_context
logger
task_templates

Methods

call
def __call__(
    self,
    *args: object,
    **kwargs: object
) -> Type[+DecoratorClassT]

call method at the class level which forward the call to instance-level call methods,

if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python only looks up special methods (with double underscores) at the class level, and not at the instance level. Used in the decoration process to forward call calls to the object level _obj_call() methods, if present.

See: https://stackoverflow.com/q/33824228

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
Type[+DecoratorClassT]
View Source
    def _forward_call_to_obj_if_callable(self, *args: object,

                                         **kwargs: object) -> Type[DecoratorClassT]:

        """

        __call__ method at the class level which forward the call to instance-level call methods,

        if present (hardcoded as '_obj_call()'). This is needed due to the peculiarity that Python

        only looks up special methods (with double underscores) at the class level, and not at the

        instance level. Used in the decoration process to forward __call__ calls to the object level

        _obj_call() methods, if present.

        See: https://stackoverflow.com/q/33824228

        """

        if hasattr(self, '_obj_call'):

            return self._obj_call(*args, **kwargs)

        if hasattr(self, '_wrapped_call'):

            return self._wrapped_call(*args, **kwargs)

        raise TypeError("'{}' object is not callable".format(self.__class__.__name__))
eq
def __eq__(
    self,
    other: object
) -> bool

Return self==value.

Parameters:

Name Type Description Default
other object

Returns:

Type Description
bool
View Source
    def __eq__(self, other: object) -> bool:

        if not isinstance(other, JobBase):

            return NotImplemented

        return self._get_init_args() == other._get_init_args() \

            and self._get_init_kwargs() == other._get_init_kwargs()
apply
def apply(
    self
) -> omnipy.api.protocols.private.compute.job.IsJob

Returns:

Type Description
IsJob
View Source
    def apply(self) -> IsJob:

        self_as_job_base = cast(IsJobBase, self)

        job = self_as_job_base._apply()

        update_wrapper(job, self, updated=[])

        return job
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        return asyncio.iscoroutinefunction(self._job_func)
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        if self._logger is not None:

            create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()

            _former_log_record_factory = logging.getLogRecordFactory()

            if _former_log_record_factory.__name__ != '_log_record_editor':

                def _log_record_editor(*args, **kwargs):

                    record = _former_log_record_factory(*args, **kwargs)

                    record.created = create_time

                    record.engine = f"[{record.name.split('.')[0].upper()}]"

                    if len(record.engine) < 9:

                        record.engine += ' '

                    return record

                logging.setLogRecordFactory(_log_record_editor)

            self._logger.log(level, log_msg)
refine
def refine(
    self,
    *args: Any,
    update: bool = True,
    **kwargs: object
) -> omnipy.api.protocols.private.compute.job.IsJobTemplate

Parameters:

Name Type Description Default
args Any
update bool True
kwargs object

Returns:

Type Description
IsJobTemplate
View Source
    def refine(self, *args: Any, update: bool = True, **kwargs: object) -> IsJobTemplate:

        self_as_job_base = cast(IsJobBase, self)

        return self_as_job_base._refine(*args, update=update, **kwargs)
run
def run(
    self,
    *args: object,
    **kwargs: object
) -> object

Parameters:

Name Type Description Default
args object
kwargs object

Returns:

Type Description
object
View Source
    def run(self, *args: object, **kwargs: object) -> object:

        # TODO: Using JobTemplateMixin.run() inside flows should give error message

        return self.apply()(*args, **kwargs)