Skip to content

Module omnipy.compute.mixins.flow_context

Overview

View Source
from datetime import datetime

from omnipy.api.protocols.private.compute.mixins import IsNestedContext

class FlowContextJobMixin:

    """"""

    def __init__(self) -> None:

        self._time_of_last_run = None

    @property

    def flow_context(self) -> IsNestedContext:

        class FlowContext:

            @classmethod

            def __enter__(cls):

                self.__class__.job_creator.__enter__()

                self._time_of_last_run = self.time_of_cur_toplevel_flow_run

            @classmethod

            def __exit__(cls, exc_type, exc_val, exc_tb):

                self.__class__.job_creator.__exit__(exc_type, exc_val, exc_tb)

        return FlowContext()

    @property

    def time_of_last_run(self) -> datetime | None:

        return self._time_of_last_run

Classes

FlowContextJobMixin

class FlowContextJobMixin(

)
View Source
class FlowContextJobMixin:

    """"""

    def __init__(self) -> None:

        self._time_of_last_run = None

    @property

    def flow_context(self) -> IsNestedContext:

        class FlowContext:

            @classmethod

            def __enter__(cls):

                self.__class__.job_creator.__enter__()

                self._time_of_last_run = self.time_of_cur_toplevel_flow_run

            @classmethod

            def __exit__(cls, exc_type, exc_val, exc_tb):

                self.__class__.job_creator.__exit__(exc_type, exc_val, exc_tb)

        return FlowContext()

    @property

    def time_of_last_run(self) -> datetime | None:

        return self._time_of_last_run

Instance variables

flow_context
time_of_last_run