Skip to content

omnipy.engine.job_runner

CLASS DESCRIPTION
DagFlowRunnerEngine

Base class for DAG flow runner engine implementations

FuncFlowRunnerEngine

Base class for function flow runner engine implementations

JobRunnerEngine

Base class for job runner engine implementations

LinearFlowRunnerEngine

Base class for linear flow runner engine implementations

TaskRunnerEngine

Base class for task runner engine implementations

DagFlowRunnerEngine

Bases: JobRunnerEngine

Base class for DAG flow runner engine implementations

METHOD DESCRIPTION
__init__
apply_dag_flow_decorator
default_dag_flow_run_decorator
get_config_cls

Specification of config class mapped to an Engine subclass. Must be implemented by all

set_config
set_registry
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobRunnerConfig

registry

TYPE: IsRunStateRegistry | None

Source code in src/omnipy/engine/job_runner.py
class DagFlowRunnerEngine(JobRunnerEngine):
    """Base class for DAG flow runner engine implementations"""
    def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,
                                 job_callback_accept_decorator: Callable) -> None:
        def _dag_flow_decorator(call_func: Callable) -> Callable:
            self._register_job_state(dag_flow, RunState.INITIALIZED)
            state = self._init_dag_flow(dag_flow)

            def _dag_flow_runner_call_func(*args: object, **kwargs: object) -> Any:
                self._register_job_state(dag_flow, RunState.RUNNING)
                flow_result = self._run_dag_flow(state, dag_flow, *args, **kwargs)
                return self._decorate_result_with_job_finalization_detector(dag_flow, flow_result)

            return _dag_flow_runner_call_func

        job_callback_accept_decorator(_dag_flow_decorator)

    @staticmethod
    def default_dag_flow_run_decorator(dag_flow: IsDagFlow) -> Callable:  # noqa: C901
        def _inner_run_dag_flow(*args: object, **kwargs: object):
            results = {}
            result = None
            with dag_flow.flow_context:
                for i, job in enumerate(dag_flow.task_templates):
                    if i == 0:
                        results = dag_flow.get_bound_args(*args, **kwargs).arguments

                    param_keys = set(inspect.signature(job).parameters.keys())

                    # TODO: Refactor to remove dependency
                    #       Also, add test for not allowing override of fixed_params
                    if hasattr(job, 'param_key_map'):
                        for key, val in job.param_key_map.items():
                            if key in param_keys:
                                param_keys.remove(key)
                                param_keys.add(val)

                    if hasattr(job, 'fixed_params'):
                        for key in job.fixed_params.keys():
                            if key in param_keys:
                                param_keys.remove(key)

                    params = {key: val for key, val in results.items() if key in param_keys}
                    result = job(**params)

                    if isinstance(result, dict) and len(result) > 0:
                        results.update(result)
                    else:
                        results[job.name] = result
            return result

        return _inner_run_dag_flow

    @abstractmethod
    def _init_dag_flow(self, dag_flow: IsDagFlow) -> Any:
        ...

    @abstractmethod
    def _run_dag_flow(self, state: Any, dag_flow: IsDagFlow, *args, **kwargs) -> Any:
        ...

config property

registry property

registry: IsRunStateRegistry | None

__init__

__init__() -> None
Source code in src/omnipy/engine/_base.py
def __init__(self) -> None:
    config_cls = self.get_config_cls()
    self._config: IsJobRunnerConfig = config_cls()
    self._registry: IsRunStateRegistry | None = None

    self._init_engine()

apply_dag_flow_decorator

apply_dag_flow_decorator(dag_flow: IsDagFlow, job_callback_accept_decorator: Callable) -> None
Source code in src/omnipy/engine/job_runner.py
def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,
                             job_callback_accept_decorator: Callable) -> None:
    def _dag_flow_decorator(call_func: Callable) -> Callable:
        self._register_job_state(dag_flow, RunState.INITIALIZED)
        state = self._init_dag_flow(dag_flow)

        def _dag_flow_runner_call_func(*args: object, **kwargs: object) -> Any:
            self._register_job_state(dag_flow, RunState.RUNNING)
            flow_result = self._run_dag_flow(state, dag_flow, *args, **kwargs)
            return self._decorate_result_with_job_finalization_detector(dag_flow, flow_result)

        return _dag_flow_runner_call_func

    job_callback_accept_decorator(_dag_flow_decorator)

default_dag_flow_run_decorator staticmethod

default_dag_flow_run_decorator(dag_flow: IsDagFlow) -> Callable
Source code in src/omnipy/engine/job_runner.py
@staticmethod
def default_dag_flow_run_decorator(dag_flow: IsDagFlow) -> Callable:  # noqa: C901
    def _inner_run_dag_flow(*args: object, **kwargs: object):
        results = {}
        result = None
        with dag_flow.flow_context:
            for i, job in enumerate(dag_flow.task_templates):
                if i == 0:
                    results = dag_flow.get_bound_args(*args, **kwargs).arguments

                param_keys = set(inspect.signature(job).parameters.keys())

                # TODO: Refactor to remove dependency
                #       Also, add test for not allowing override of fixed_params
                if hasattr(job, 'param_key_map'):
                    for key, val in job.param_key_map.items():
                        if key in param_keys:
                            param_keys.remove(key)
                            param_keys.add(val)

                if hasattr(job, 'fixed_params'):
                    for key in job.fixed_params.keys():
                        if key in param_keys:
                            param_keys.remove(key)

                params = {key: val for key, val in results.items() if key in param_keys}
                result = job(**params)

                if isinstance(result, dict) and len(result) > 0:
                    results.update(result)
                else:
                    results[job.name] = result
        return result

    return _inner_run_dag_flow

get_config_cls abstractmethod classmethod

get_config_cls() -> Type[IsJobRunnerConfig]

Specification of config class mapped to an Engine subclass. Must be implemented by all subclasses of Engine. If no configuration is needed, then the EngineConfig class should be returned. :return: Class implementing the IsEngineConfig protocol

Source code in src/omnipy/engine/_base.py
@classmethod
@abstractmethod
def get_config_cls(cls) -> Type[IsJobRunnerConfig]:
    """
    Specification of config class mapped to an Engine subclass. Must be implemented by all
    subclasses of Engine. If no configuration is needed, then the EngineConfig class should be
    returned.
    :return: Class implementing the IsEngineConfig protocol
    """

set_config

set_config(config: IsJobRunnerConfig) -> None
Source code in src/omnipy/engine/_base.py
def set_config(self, config: IsJobRunnerConfig) -> None:
    self._config = config
    self._update_from_config()

set_registry

set_registry(registry: IsRunStateRegistry | None) -> None
Source code in src/omnipy/engine/_base.py
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
    self._registry = registry

FuncFlowRunnerEngine

Bases: JobRunnerEngine

Base class for function flow runner engine implementations

METHOD DESCRIPTION
__init__
apply_func_flow_decorator
get_config_cls

Specification of config class mapped to an Engine subclass. Must be implemented by all

set_config
set_registry
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobRunnerConfig

registry

TYPE: IsRunStateRegistry | None

Source code in src/omnipy/engine/job_runner.py
class FuncFlowRunnerEngine(JobRunnerEngine):
    """Base class for function flow runner engine implementations"""
    def apply_func_flow_decorator(self,
                                  func_flow: IsFuncFlow,
                                  job_callback_accept_decorator: Callable) -> None:
        def _func_flow_decorator(call_func: Callable) -> Callable:
            self._register_job_state(func_flow, RunState.INITIALIZED)
            state = self._init_func_flow(func_flow, call_func)

            def _func_flow_runner_call_func(*args: object, **kwargs: object) -> Any:
                self._register_job_state(func_flow, RunState.RUNNING)
                with func_flow.flow_context:
                    flow_result = self._run_func_flow(state, func_flow, *args, **kwargs)
                    return self._decorate_result_with_job_finalization_detector(
                        func_flow, flow_result)

            return _func_flow_runner_call_func

        job_callback_accept_decorator(_func_flow_decorator)

    @abstractmethod
    def _init_func_flow(self, func_flow: IsFuncFlow, call_func: Callable) -> object:
        ...

    @abstractmethod
    def _run_func_flow(self, state: Any, func_flow: IsFuncFlow, *args, **kwargs) -> Any:
        ...

config property

registry property

registry: IsRunStateRegistry | None

__init__

__init__() -> None
Source code in src/omnipy/engine/_base.py
def __init__(self) -> None:
    config_cls = self.get_config_cls()
    self._config: IsJobRunnerConfig = config_cls()
    self._registry: IsRunStateRegistry | None = None

    self._init_engine()

apply_func_flow_decorator

apply_func_flow_decorator(func_flow: IsFuncFlow, job_callback_accept_decorator: Callable) -> None
Source code in src/omnipy/engine/job_runner.py
def apply_func_flow_decorator(self,
                              func_flow: IsFuncFlow,
                              job_callback_accept_decorator: Callable) -> None:
    def _func_flow_decorator(call_func: Callable) -> Callable:
        self._register_job_state(func_flow, RunState.INITIALIZED)
        state = self._init_func_flow(func_flow, call_func)

        def _func_flow_runner_call_func(*args: object, **kwargs: object) -> Any:
            self._register_job_state(func_flow, RunState.RUNNING)
            with func_flow.flow_context:
                flow_result = self._run_func_flow(state, func_flow, *args, **kwargs)
                return self._decorate_result_with_job_finalization_detector(
                    func_flow, flow_result)

        return _func_flow_runner_call_func

    job_callback_accept_decorator(_func_flow_decorator)

get_config_cls abstractmethod classmethod

get_config_cls() -> Type[IsJobRunnerConfig]

Specification of config class mapped to an Engine subclass. Must be implemented by all subclasses of Engine. If no configuration is needed, then the EngineConfig class should be returned. :return: Class implementing the IsEngineConfig protocol

Source code in src/omnipy/engine/_base.py
@classmethod
@abstractmethod
def get_config_cls(cls) -> Type[IsJobRunnerConfig]:
    """
    Specification of config class mapped to an Engine subclass. Must be implemented by all
    subclasses of Engine. If no configuration is needed, then the EngineConfig class should be
    returned.
    :return: Class implementing the IsEngineConfig protocol
    """

set_config

set_config(config: IsJobRunnerConfig) -> None
Source code in src/omnipy/engine/_base.py
def set_config(self, config: IsJobRunnerConfig) -> None:
    self._config = config
    self._update_from_config()

set_registry

set_registry(registry: IsRunStateRegistry | None) -> None
Source code in src/omnipy/engine/_base.py
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
    self._registry = registry

JobRunnerEngine

Bases: Engine, ABC

Base class for job runner engine implementations

METHOD DESCRIPTION
__init__
get_config_cls

Specification of config class mapped to an Engine subclass. Must be implemented by all

set_config
set_registry
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobRunnerConfig

registry

TYPE: IsRunStateRegistry | None

Source code in src/omnipy/engine/job_runner.py
class JobRunnerEngine(Engine, ABC):
    """Base class for job runner engine implementations"""
    def _register_job_state(self, job: IsJob, state: RunState.Literals) -> None:
        if self._registry:
            self._registry.set_job_state(job, state)

    def _decorate_result_with_job_finalization_detector(  # noqa: C901
            self, job: IsJob, job_result: object):
        # TODO: Simplify _decorate_result_with_job_finalization_detector()
        if isinstance(job_result, GeneratorType):
            job_result = cast(GeneratorType, job_result)

            def detect_finished_generator_decorator():
                try:
                    value = yield next(job_result)
                    while True:
                        value = yield job_result.send(value)
                except StopIteration:
                    self._register_job_state(job, RunState.FINISHED)

            return detect_finished_generator_decorator()
        elif isinstance(job_result, AsyncGeneratorType):
            job_result = cast(AsyncGeneratorType, job_result)

            async def detect_finished_async_generator_decorator():
                try:
                    if sys.version_info >= (3, 10):
                        value = yield await anext(job_result)
                    else:
                        value = yield await job_result.__anext__()
                    while True:
                        value = yield await job_result.asend(value)
                except StopAsyncIteration:
                    self._register_job_state(job, RunState.FINISHED)

            return detect_finished_async_generator_decorator()

        elif inspect.isawaitable(job_result):
            job_result = cast(Awaitable, job_result)

            async def detect_finished_coroutine():
                result = await job_result
                self._register_job_state(job, RunState.FINISHED)
                return result

            return detect_finished_coroutine()
        else:
            self._register_job_state(job, RunState.FINISHED)
            return job_result

config property

registry property

registry: IsRunStateRegistry | None

__init__

__init__() -> None
Source code in src/omnipy/engine/_base.py
def __init__(self) -> None:
    config_cls = self.get_config_cls()
    self._config: IsJobRunnerConfig = config_cls()
    self._registry: IsRunStateRegistry | None = None

    self._init_engine()

get_config_cls abstractmethod classmethod

get_config_cls() -> Type[IsJobRunnerConfig]

Specification of config class mapped to an Engine subclass. Must be implemented by all subclasses of Engine. If no configuration is needed, then the EngineConfig class should be returned. :return: Class implementing the IsEngineConfig protocol

Source code in src/omnipy/engine/_base.py
@classmethod
@abstractmethod
def get_config_cls(cls) -> Type[IsJobRunnerConfig]:
    """
    Specification of config class mapped to an Engine subclass. Must be implemented by all
    subclasses of Engine. If no configuration is needed, then the EngineConfig class should be
    returned.
    :return: Class implementing the IsEngineConfig protocol
    """

set_config

set_config(config: IsJobRunnerConfig) -> None
Source code in src/omnipy/engine/_base.py
def set_config(self, config: IsJobRunnerConfig) -> None:
    self._config = config
    self._update_from_config()

set_registry

set_registry(registry: IsRunStateRegistry | None) -> None
Source code in src/omnipy/engine/_base.py
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
    self._registry = registry

LinearFlowRunnerEngine

Bases: JobRunnerEngine

Base class for linear flow runner engine implementations

METHOD DESCRIPTION
__init__
apply_linear_flow_decorator
default_linear_flow_run_decorator
get_config_cls

Specification of config class mapped to an Engine subclass. Must be implemented by all

set_config
set_registry
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobRunnerConfig

registry

TYPE: IsRunStateRegistry | None

Source code in src/omnipy/engine/job_runner.py
class LinearFlowRunnerEngine(JobRunnerEngine):
    """Base class for linear flow runner engine implementations"""
    def apply_linear_flow_decorator(self,
                                    linear_flow: IsLinearFlow,
                                    job_callback_accept_decorator: Callable) -> None:
        def _linear_flow_decorator(call_func: Callable) -> Callable:
            self._register_job_state(linear_flow, RunState.INITIALIZED)
            state = self._init_linear_flow(linear_flow)

            def _linear_flow_runner_call_func(*args: object, **kwargs: object) -> Any:
                self._register_job_state(linear_flow, RunState.RUNNING)
                flow_result = self._run_linear_flow(state, linear_flow, *args, **kwargs)
                return self._decorate_result_with_job_finalization_detector(
                    linear_flow, flow_result)

            return _linear_flow_runner_call_func

        job_callback_accept_decorator(_linear_flow_decorator)

    @staticmethod
    def default_linear_flow_run_decorator(linear_flow: IsLinearFlow) -> Callable:
        def _inner_run_linear_flow(*args: object, **kwargs: object):

            result = None
            with linear_flow.flow_context:
                for i, job in enumerate(linear_flow.task_templates):
                    # TODO: Better handling of kwargs
                    if i == 0:
                        result = job(*args, **kwargs)
                    else:
                        result = job(*args)

                    args = (result,)
            return result

        return _inner_run_linear_flow

    @abstractmethod
    def _init_linear_flow(self, linear_flow: IsLinearFlow) -> Any:
        ...

    @abstractmethod
    def _run_linear_flow(self, state: Any, linear_flow: IsLinearFlow, *args, **kwargs) -> Any:
        ...

config property

registry property

registry: IsRunStateRegistry | None

__init__

__init__() -> None
Source code in src/omnipy/engine/_base.py
def __init__(self) -> None:
    config_cls = self.get_config_cls()
    self._config: IsJobRunnerConfig = config_cls()
    self._registry: IsRunStateRegistry | None = None

    self._init_engine()

apply_linear_flow_decorator

apply_linear_flow_decorator(
    linear_flow: IsLinearFlow, job_callback_accept_decorator: Callable
) -> None
Source code in src/omnipy/engine/job_runner.py
def apply_linear_flow_decorator(self,
                                linear_flow: IsLinearFlow,
                                job_callback_accept_decorator: Callable) -> None:
    def _linear_flow_decorator(call_func: Callable) -> Callable:
        self._register_job_state(linear_flow, RunState.INITIALIZED)
        state = self._init_linear_flow(linear_flow)

        def _linear_flow_runner_call_func(*args: object, **kwargs: object) -> Any:
            self._register_job_state(linear_flow, RunState.RUNNING)
            flow_result = self._run_linear_flow(state, linear_flow, *args, **kwargs)
            return self._decorate_result_with_job_finalization_detector(
                linear_flow, flow_result)

        return _linear_flow_runner_call_func

    job_callback_accept_decorator(_linear_flow_decorator)

default_linear_flow_run_decorator staticmethod

default_linear_flow_run_decorator(linear_flow: IsLinearFlow) -> Callable
Source code in src/omnipy/engine/job_runner.py
@staticmethod
def default_linear_flow_run_decorator(linear_flow: IsLinearFlow) -> Callable:
    def _inner_run_linear_flow(*args: object, **kwargs: object):

        result = None
        with linear_flow.flow_context:
            for i, job in enumerate(linear_flow.task_templates):
                # TODO: Better handling of kwargs
                if i == 0:
                    result = job(*args, **kwargs)
                else:
                    result = job(*args)

                args = (result,)
        return result

    return _inner_run_linear_flow

get_config_cls abstractmethod classmethod

get_config_cls() -> Type[IsJobRunnerConfig]

Specification of config class mapped to an Engine subclass. Must be implemented by all subclasses of Engine. If no configuration is needed, then the EngineConfig class should be returned. :return: Class implementing the IsEngineConfig protocol

Source code in src/omnipy/engine/_base.py
@classmethod
@abstractmethod
def get_config_cls(cls) -> Type[IsJobRunnerConfig]:
    """
    Specification of config class mapped to an Engine subclass. Must be implemented by all
    subclasses of Engine. If no configuration is needed, then the EngineConfig class should be
    returned.
    :return: Class implementing the IsEngineConfig protocol
    """

set_config

set_config(config: IsJobRunnerConfig) -> None
Source code in src/omnipy/engine/_base.py
def set_config(self, config: IsJobRunnerConfig) -> None:
    self._config = config
    self._update_from_config()

set_registry

set_registry(registry: IsRunStateRegistry | None) -> None
Source code in src/omnipy/engine/_base.py
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
    self._registry = registry

TaskRunnerEngine

Bases: JobRunnerEngine

Base class for task runner engine implementations

METHOD DESCRIPTION
__init__
apply_task_decorator
get_config_cls

Specification of config class mapped to an Engine subclass. Must be implemented by all

set_config
set_registry
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobRunnerConfig

registry

TYPE: IsRunStateRegistry | None

Source code in src/omnipy/engine/job_runner.py
class TaskRunnerEngine(JobRunnerEngine):
    """Base class for task runner engine implementations"""
    def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:
        def _task_decorator(call_func: Callable) -> Callable:
            self._register_job_state(task, RunState.INITIALIZED)
            state = self._init_task(task, call_func)

            def _task_runner_call_func(*args: object, **kwargs: object) -> Any:
                self._register_job_state(task, RunState.RUNNING)
                task_result = self._run_task(state, task, call_func, *args, **kwargs)
                return self._decorate_result_with_job_finalization_detector(task, task_result)

            return _task_runner_call_func

        job_callback_accept_decorator(_task_decorator)

    @abstractmethod
    def _init_task(self, task: IsTask, call_func: Callable) -> Any:
        ...

    @abstractmethod
    def _run_task(self, state: Any, task: IsTask, call_func: Callable, *args, **kwargs) -> Any:
        ...

config property

registry property

registry: IsRunStateRegistry | None

__init__

__init__() -> None
Source code in src/omnipy/engine/_base.py
def __init__(self) -> None:
    config_cls = self.get_config_cls()
    self._config: IsJobRunnerConfig = config_cls()
    self._registry: IsRunStateRegistry | None = None

    self._init_engine()

apply_task_decorator

apply_task_decorator(task: IsTask, job_callback_accept_decorator: Callable) -> None
Source code in src/omnipy/engine/job_runner.py
def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:
    def _task_decorator(call_func: Callable) -> Callable:
        self._register_job_state(task, RunState.INITIALIZED)
        state = self._init_task(task, call_func)

        def _task_runner_call_func(*args: object, **kwargs: object) -> Any:
            self._register_job_state(task, RunState.RUNNING)
            task_result = self._run_task(state, task, call_func, *args, **kwargs)
            return self._decorate_result_with_job_finalization_detector(task, task_result)

        return _task_runner_call_func

    job_callback_accept_decorator(_task_decorator)

get_config_cls abstractmethod classmethod

get_config_cls() -> Type[IsJobRunnerConfig]

Specification of config class mapped to an Engine subclass. Must be implemented by all subclasses of Engine. If no configuration is needed, then the EngineConfig class should be returned. :return: Class implementing the IsEngineConfig protocol

Source code in src/omnipy/engine/_base.py
@classmethod
@abstractmethod
def get_config_cls(cls) -> Type[IsJobRunnerConfig]:
    """
    Specification of config class mapped to an Engine subclass. Must be implemented by all
    subclasses of Engine. If no configuration is needed, then the EngineConfig class should be
    returned.
    :return: Class implementing the IsEngineConfig protocol
    """

set_config

set_config(config: IsJobRunnerConfig) -> None
Source code in src/omnipy/engine/_base.py
def set_config(self, config: IsJobRunnerConfig) -> None:
    self._config = config
    self._update_from_config()

set_registry

set_registry(registry: IsRunStateRegistry | None) -> None
Source code in src/omnipy/engine/_base.py
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
    self._registry = registry