Skip to content

omnipy.engine.local

CLASS DESCRIPTION
LocalRunner

Local job runner

LocalRunner

Bases: TaskRunnerEngine, LinearFlowRunnerEngine, DagFlowRunnerEngine, FuncFlowRunnerEngine

Local job runner

METHOD DESCRIPTION
__init__
apply_dag_flow_decorator
apply_func_flow_decorator
apply_linear_flow_decorator
apply_task_decorator
default_dag_flow_run_decorator
default_linear_flow_run_decorator
get_config_cls
set_config
set_registry
ATTRIBUTE DESCRIPTION
config

TYPE: IsJobRunnerConfig

registry

TYPE: IsRunStateRegistry | None

Source code in src/omnipy/engine/local.py
class LocalRunner(TaskRunnerEngine,
                  LinearFlowRunnerEngine,
                  DagFlowRunnerEngine,
                  FuncFlowRunnerEngine):
    """Local job runner"""
    def _init_engine(self) -> None:
        ...

    def _update_from_config(self) -> None:
        ...

    @classmethod
    def get_config_cls(cls) -> Type[IsLocalRunnerConfig]:
        return LocalRunnerConfig

    def _init_task(self, task: IsTask, call_func: Callable) -> Any:
        ...

    def _run_task(self, state: Any, task: IsTask, call_func: Callable, *args, **kwargs) -> Any:
        return call_func(*args, **kwargs)

    def _init_linear_flow(self, flow: IsLinearFlow) -> Any:
        ...

    def _run_linear_flow(self, state: Any, flow: IsLinearFlow, *args, **kwargs) -> Any:
        return self.default_linear_flow_run_decorator(flow)(*args, **kwargs)

    def _init_dag_flow(self, flow: IsDagFlow) -> Any:
        ...

    def _run_dag_flow(self, state: Any, flow: IsDagFlow, *args, **kwargs) -> Any:
        return self.default_dag_flow_run_decorator(flow)(*args, **kwargs)

    def _init_func_flow(self, func_flow: IsFuncFlow, call_func: Callable) -> object:
        return call_func

    def _run_func_flow(self, state: Any, func_flow: IsFuncFlow, *args, **kwargs) -> Any:
        call_func = state
        with func_flow.flow_context:
            return call_func(*args, **kwargs)

config property

registry property

registry: IsRunStateRegistry | None

__init__

__init__() -> None
Source code in src/omnipy/engine/_base.py
def __init__(self) -> None:
    config_cls = self.get_config_cls()
    self._config: IsJobRunnerConfig = config_cls()
    self._registry: IsRunStateRegistry | None = None

    self._init_engine()

apply_dag_flow_decorator

apply_dag_flow_decorator(dag_flow: IsDagFlow, job_callback_accept_decorator: Callable) -> None
Source code in src/omnipy/engine/job_runner.py
def apply_dag_flow_decorator(self, dag_flow: IsDagFlow,
                             job_callback_accept_decorator: Callable) -> None:
    def _dag_flow_decorator(call_func: Callable) -> Callable:
        self._register_job_state(dag_flow, RunState.INITIALIZED)
        state = self._init_dag_flow(dag_flow)

        def _dag_flow_runner_call_func(*args: object, **kwargs: object) -> Any:
            self._register_job_state(dag_flow, RunState.RUNNING)
            flow_result = self._run_dag_flow(state, dag_flow, *args, **kwargs)
            return self._decorate_result_with_job_finalization_detector(dag_flow, flow_result)

        return _dag_flow_runner_call_func

    job_callback_accept_decorator(_dag_flow_decorator)

apply_func_flow_decorator

apply_func_flow_decorator(func_flow: IsFuncFlow, job_callback_accept_decorator: Callable) -> None
Source code in src/omnipy/engine/job_runner.py
def apply_func_flow_decorator(self,
                              func_flow: IsFuncFlow,
                              job_callback_accept_decorator: Callable) -> None:
    def _func_flow_decorator(call_func: Callable) -> Callable:
        self._register_job_state(func_flow, RunState.INITIALIZED)
        state = self._init_func_flow(func_flow, call_func)

        def _func_flow_runner_call_func(*args: object, **kwargs: object) -> Any:
            self._register_job_state(func_flow, RunState.RUNNING)
            with func_flow.flow_context:
                flow_result = self._run_func_flow(state, func_flow, *args, **kwargs)
                return self._decorate_result_with_job_finalization_detector(
                    func_flow, flow_result)

        return _func_flow_runner_call_func

    job_callback_accept_decorator(_func_flow_decorator)

apply_linear_flow_decorator

apply_linear_flow_decorator(
    linear_flow: IsLinearFlow, job_callback_accept_decorator: Callable
) -> None
Source code in src/omnipy/engine/job_runner.py
def apply_linear_flow_decorator(self,
                                linear_flow: IsLinearFlow,
                                job_callback_accept_decorator: Callable) -> None:
    def _linear_flow_decorator(call_func: Callable) -> Callable:
        self._register_job_state(linear_flow, RunState.INITIALIZED)
        state = self._init_linear_flow(linear_flow)

        def _linear_flow_runner_call_func(*args: object, **kwargs: object) -> Any:
            self._register_job_state(linear_flow, RunState.RUNNING)
            flow_result = self._run_linear_flow(state, linear_flow, *args, **kwargs)
            return self._decorate_result_with_job_finalization_detector(
                linear_flow, flow_result)

        return _linear_flow_runner_call_func

    job_callback_accept_decorator(_linear_flow_decorator)

apply_task_decorator

apply_task_decorator(task: IsTask, job_callback_accept_decorator: Callable) -> None
Source code in src/omnipy/engine/job_runner.py
def apply_task_decorator(self, task: IsTask, job_callback_accept_decorator: Callable) -> None:
    def _task_decorator(call_func: Callable) -> Callable:
        self._register_job_state(task, RunState.INITIALIZED)
        state = self._init_task(task, call_func)

        def _task_runner_call_func(*args: object, **kwargs: object) -> Any:
            self._register_job_state(task, RunState.RUNNING)
            task_result = self._run_task(state, task, call_func, *args, **kwargs)
            return self._decorate_result_with_job_finalization_detector(task, task_result)

        return _task_runner_call_func

    job_callback_accept_decorator(_task_decorator)

default_dag_flow_run_decorator staticmethod

default_dag_flow_run_decorator(dag_flow: IsDagFlow) -> Callable
Source code in src/omnipy/engine/job_runner.py
@staticmethod
def default_dag_flow_run_decorator(dag_flow: IsDagFlow) -> Callable:  # noqa: C901
    def _inner_run_dag_flow(*args: object, **kwargs: object):
        results = {}
        result = None
        with dag_flow.flow_context:
            for i, job in enumerate(dag_flow.task_templates):
                if i == 0:
                    results = dag_flow.get_bound_args(*args, **kwargs).arguments

                param_keys = set(inspect.signature(job).parameters.keys())

                # TODO: Refactor to remove dependency
                #       Also, add test for not allowing override of fixed_params
                if hasattr(job, 'param_key_map'):
                    for key, val in job.param_key_map.items():
                        if key in param_keys:
                            param_keys.remove(key)
                            param_keys.add(val)

                if hasattr(job, 'fixed_params'):
                    for key in job.fixed_params.keys():
                        if key in param_keys:
                            param_keys.remove(key)

                params = {key: val for key, val in results.items() if key in param_keys}
                result = job(**params)

                if isinstance(result, dict) and len(result) > 0:
                    results.update(result)
                else:
                    results[job.name] = result
        return result

    return _inner_run_dag_flow

default_linear_flow_run_decorator staticmethod

default_linear_flow_run_decorator(linear_flow: IsLinearFlow) -> Callable
Source code in src/omnipy/engine/job_runner.py
@staticmethod
def default_linear_flow_run_decorator(linear_flow: IsLinearFlow) -> Callable:
    def _inner_run_linear_flow(*args: object, **kwargs: object):

        result = None
        with linear_flow.flow_context:
            for i, job in enumerate(linear_flow.task_templates):
                # TODO: Better handling of kwargs
                if i == 0:
                    result = job(*args, **kwargs)
                else:
                    result = job(*args)

                args = (result,)
        return result

    return _inner_run_linear_flow

get_config_cls classmethod

get_config_cls() -> Type[IsLocalRunnerConfig]
Source code in src/omnipy/engine/local.py
@classmethod
def get_config_cls(cls) -> Type[IsLocalRunnerConfig]:
    return LocalRunnerConfig

set_config

set_config(config: IsJobRunnerConfig) -> None
Source code in src/omnipy/engine/_base.py
def set_config(self, config: IsJobRunnerConfig) -> None:
    self._config = config
    self._update_from_config()

set_registry

set_registry(registry: IsRunStateRegistry | None) -> None
Source code in src/omnipy/engine/_base.py
def set_registry(self, registry: IsRunStateRegistry | None) -> None:
    self._registry = registry