Skip to content

omnipy.components.remote.helpers

CLASS DESCRIPTION
RateLimitingClientSession

A ClientSession that limits the number of requests made per time period, allowing an initial

ATTRIBUTE DESCRIPTION
BACKOFF_STRATEGY_2_RETRY_CLS

BACKOFF_STRATEGY_2_RETRY_CLS module-attribute

BACKOFF_STRATEGY_2_RETRY_CLS = {
    BackoffStrategy.EXPONENTIAL: ExponentialRetry,
    BackoffStrategy.JITTER: JitterRetry,
    BackoffStrategy.FIBONACCI: FibonacciRetry,
    BackoffStrategy.RANDOM: RandomRetry,
}

RateLimitingClientSession

Bases: ClientSession

A ClientSession that limits the number of requests made per time period, allowing an initial burst of requests to go through before rate limiting kicks in for the rest.

METHOD DESCRIPTION
__init__
ATTRIBUTE DESCRIPTION
requests_per_second

TYPE: float

Source code in src/omnipy/components/remote/helpers.py
class RateLimitingClientSession(ClientSession):
    """
    A ClientSession that limits the number of requests made per time period, allowing an initial
    burst of requests to go through before rate limiting kicks in for the rest.
    """
    def __init__(self, requests_per_time_period: float, time_period_in_secs: float, *args,
                 **kwargs) -> None:
        from .lazy_import import AsyncLimiter

        trace_config = TraceConfig()
        trace_config.on_request_start.append(self._limit_request)
        super().__init__(*args, trace_configs=[trace_config], **kwargs)

        self._requests_per_time_period = requests_per_time_period
        self._time_period_in_secs = time_period_in_secs

        self._limiter = AsyncLimiter(self._requests_per_time_period, self._time_period_in_secs)

        # To allow for an initial burst of requests of size `requests_per_time_period / 2 + 1`
        # to go through before rate limiting kicks in for the rest
        self._burst_size = self._requests_per_time_period / 2 + 1
        self._limiter.max_rate = self._burst_size
        self._cur_delay_secs: float = 0
        self._num_requests: int = 0
        self._first_submit_time: datetime | None = None

    async def _limit_request(self, *args, **kwargs):

        request_num = self._num_requests
        self._num_requests += 1

        submit_time = datetime.now()
        if self._first_submit_time is None:
            self._first_submit_time = submit_time

        await self._limiter.acquire()

        submit_time_delta = (submit_time - self._first_submit_time).total_seconds()
        if submit_time_delta > self._time_period_in_secs:
            # Resetting the rate limiter for the next batch of requests
            self._first_submit_time = submit_time
            self._cur_delay_secs = 0
            self._num_requests = 1
            request_num = 0

        # Adding a delay to compensate for the filling of the leaky bucket, which is not accounted
        # for in the AsyncLimiter. This is to allow for an initial burst of requests to go through
        # while not exceeding the rate limit for the first time period
        if self._burst_size <= request_num <= self._requests_per_time_period:
            self._cur_delay_secs += 1 / self.requests_per_second

        if self._cur_delay_secs > 0:
            await asyncio.sleep(self._cur_delay_secs)

        # print(f'Request number: {request_num}, Actual request time: {datetime.now()}')

    @property
    def requests_per_second(self) -> float:
        return self._requests_per_time_period / self._time_period_in_secs

    async def __aenter__(self) -> 'RateLimitingClientSession':
        return cast(RateLimitingClientSession, await super().__aenter__())

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: TracebackType | None,
    ) -> None:
        await super().__aexit__(exc_type, exc_val, exc_tb)

requests_per_second property

requests_per_second: float

__init__

__init__(requests_per_time_period: float, time_period_in_secs: float, *args, **kwargs) -> None
Source code in src/omnipy/components/remote/helpers.py
def __init__(self, requests_per_time_period: float, time_period_in_secs: float, *args,
             **kwargs) -> None:
    from .lazy_import import AsyncLimiter

    trace_config = TraceConfig()
    trace_config.on_request_start.append(self._limit_request)
    super().__init__(*args, trace_configs=[trace_config], **kwargs)

    self._requests_per_time_period = requests_per_time_period
    self._time_period_in_secs = time_period_in_secs

    self._limiter = AsyncLimiter(self._requests_per_time_period, self._time_period_in_secs)

    # To allow for an initial burst of requests of size `requests_per_time_period / 2 + 1`
    # to go through before rate limiting kicks in for the rest
    self._burst_size = self._requests_per_time_period / 2 + 1
    self._limiter.max_rate = self._burst_size
    self._cur_delay_secs: float = 0
    self._num_requests: int = 0
    self._first_submit_time: datetime | None = None