Skip to content

Module omnipy.compute.func_job

Overview

View Source
import asyncio

from typing import Callable

from omnipy.api.typedefs import GeneralDecorator

from omnipy.compute.job import JobBase

from omnipy.compute.mixins.func_signature import SignatureFuncJobBaseMixin

from omnipy.compute.mixins.iterate import IterateFuncJobBaseMixin

from omnipy.compute.mixins.params import ParamsFuncJobBaseMixin

from omnipy.compute.mixins.result_key import ResultKeyFuncJobBaseMixin

from omnipy.compute.mixins.serialize import SerializerFuncJobBaseMixin

class PlainFuncArgJobBase(JobBase):

    def __init__(self, job_func: Callable, *args: object, **kwargs: object) -> None:

        self._job_func = job_func

    def _get_init_args(self) -> tuple[object, ...]:

        return self._job_func,

    def has_coroutine_func(self) -> bool:

        return asyncio.iscoroutinefunction(self._job_func)

    def _call_job(self, *args: object, **kwargs: object) -> object:

        """To be overloaded by mixins"""

        return self._call_func(*args, **kwargs)

    def _call_func(self, *args: object, **kwargs: object) -> object:

        """

        To be decorated by job runners and mixins that need early application. Should not

        be overloaded using inheritance. The method _accept_call_func_decorator accepts

        decorators.

        """

        return self._job_func(*args, **kwargs)

    def _accept_call_func_decorator(self, call_func_decorator: GeneralDecorator) -> None:

        self._call_func = call_func_decorator(self._call_func)  # type:ignore

# Extra level needed for mixins to be able to overload _call_job (and possibly other methods)

class FuncArgJobBase(PlainFuncArgJobBase):

    ...

FuncArgJobBase.accept_mixin(SignatureFuncJobBaseMixin)

FuncArgJobBase.accept_mixin(IterateFuncJobBaseMixin)

FuncArgJobBase.accept_mixin(SerializerFuncJobBaseMixin)

FuncArgJobBase.accept_mixin(ResultKeyFuncJobBaseMixin)

FuncArgJobBase.accept_mixin(ParamsFuncJobBaseMixin)

Classes

FuncArgJobBase

class FuncArgJobBase(
    job_func: Callable,
    *args: object,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    name: str | None = None,
    **kwargs: object
)
View Source
class FuncArgJobBase(PlainFuncArgJobBase):

    ...

Class variables

WITH_MIXINS_CLS_PREFIX

Static methods

accept_mixin
def accept_mixin(
    mixin_cls: Type
) -> None

Parameters:

Name Type Description Default
mixin_cls Type

Returns:

Type Description
NoneType
View Source
    @classmethod

    def accept_mixin(cls, mixin_cls: Type) -> None:

        cls._accept_mixin(mixin_cls, update=True)
reset_mixins
def reset_mixins(

)
View Source
    @classmethod

    def reset_mixins(cls):

        cls._mixin_classes.clear()

        cls._init_params_per_mixin_cls.clear()

        cls.__init__.__signature__ = cls._orig_init_signature

Instance variables

config
engine
in_flow_context
logger

Methods

eq
def __eq__(
    self,
    other: object
) -> bool

Return self==value.

Parameters:

Name Type Description Default
other object

Returns:

Type Description
bool
View Source
    def __eq__(self, other: object) -> bool:

        if not isinstance(other, JobBase):

            return NotImplemented

        return self._get_init_args() == other._get_init_args() \

            and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        return asyncio.iscoroutinefunction(self._job_func)
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        if self._logger is not None:

            create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()

            _former_log_record_factory = logging.getLogRecordFactory()

            if _former_log_record_factory.__name__ != '_log_record_editor':

                def _log_record_editor(*args, **kwargs):

                    record = _former_log_record_factory(*args, **kwargs)

                    record.created = create_time

                    record.engine = f"[{record.name.split('.')[0].upper()}]"

                    if len(record.engine) < 9:

                        record.engine += ' '

                    return record

                logging.setLogRecordFactory(_log_record_editor)

            self._logger.log(level, log_msg)

PlainFuncArgJobBase

class PlainFuncArgJobBase(
    job_func: Callable,
    *args: object,
    iterate_over_data_files: bool = False,
    persist_outputs: omnipy.api.enums.PersistOutputsOptions | None = None,
    restore_outputs: omnipy.api.enums.RestoreOutputsOptions | None = None,
    output_storage_protocol: omnipy.api.enums.OutputStorageProtocolOptions | None = None,
    result_key: str | None = None,
    fixed_params: Optional[Mapping[str, object]] = None,
    param_key_map: Optional[Mapping[str, str]] = None,
    name: str | None = None,
    **kwargs: object
)
View Source
class PlainFuncArgJobBase(JobBase):

    def __init__(self, job_func: Callable, *args: object, **kwargs: object) -> None:

        self._job_func = job_func

    def _get_init_args(self) -> tuple[object, ...]:

        return self._job_func,

    def has_coroutine_func(self) -> bool:

        return asyncio.iscoroutinefunction(self._job_func)

    def _call_job(self, *args: object, **kwargs: object) -> object:

        """To be overloaded by mixins"""

        return self._call_func(*args, **kwargs)

    def _call_func(self, *args: object, **kwargs: object) -> object:

        """

        To be decorated by job runners and mixins that need early application. Should not

        be overloaded using inheritance. The method _accept_call_func_decorator accepts

        decorators.

        """

        return self._job_func(*args, **kwargs)

    def _accept_call_func_decorator(self, call_func_decorator: GeneralDecorator) -> None:

        self._call_func = call_func_decorator(self._call_func)  # type:ignore

Class variables

WITH_MIXINS_CLS_PREFIX

Static methods

accept_mixin
def accept_mixin(
    mixin_cls: Type
) -> None

Parameters:

Name Type Description Default
mixin_cls Type

Returns:

Type Description
NoneType
View Source
    @classmethod

    def accept_mixin(cls, mixin_cls: Type) -> None:

        cls._accept_mixin(mixin_cls, update=True)
reset_mixins
def reset_mixins(

)
View Source
    @classmethod

    def reset_mixins(cls):

        cls._mixin_classes.clear()

        cls._init_params_per_mixin_cls.clear()

        cls.__init__.__signature__ = cls._orig_init_signature

Instance variables

config
engine
in_flow_context
logger

Methods

eq
def __eq__(
    self,
    other: object
) -> bool

Return self==value.

Parameters:

Name Type Description Default
other object

Returns:

Type Description
bool
View Source
    def __eq__(self, other: object) -> bool:

        if not isinstance(other, JobBase):

            return NotImplemented

        return self._get_init_args() == other._get_init_args() \

            and self._get_init_kwargs() == other._get_init_kwargs()
has_coroutine_func
def has_coroutine_func(
    self
) -> bool

Returns:

Type Description
bool
View Source
    def has_coroutine_func(self) -> bool:

        return asyncio.iscoroutinefunction(self._job_func)
log
def log(
    self,
    log_msg: str,
    level: int = 20,
    datetime_obj: datetime.datetime | None = None
)

Parameters:

Name Type Description Default
log_msg str
level int 20
datetime_obj datetime.datetime None
View Source
    def log(self, log_msg: str, level: int = INFO, datetime_obj: datetime | None = None):

        if self._logger is not None:

            create_time = time.mktime(datetime_obj.timetuple()) if datetime_obj else time.time()

            _former_log_record_factory = logging.getLogRecordFactory()

            if _former_log_record_factory.__name__ != '_log_record_editor':

                def _log_record_editor(*args, **kwargs):

                    record = _former_log_record_factory(*args, **kwargs)

                    record.created = create_time

                    record.engine = f"[{record.name.split('.')[0].upper()}]"

                    if len(record.engine) < 9:

                        record.engine += ' '

                    return record

                logging.setLogRecordFactory(_log_record_editor)

            self._logger.log(level, log_msg)