Source code for src.bcra_connector.rate_limiter

"""
Rate limiting functionality for API requests.
"""

import time
from collections import deque
from dataclasses import dataclass
from threading import Lock
from typing import Deque, Optional


[docs] @dataclass class RateLimitConfig: """Configuration for rate limiting. :param calls: Number of calls allowed per period :param period: Time period in seconds :param _burst: Maximum number of calls allowed in burst (internal) """ calls: int period: float _burst: Optional[int] = None def __post_init__(self) -> None: """Validate rate limit configuration.""" if self.calls <= 0: raise ValueError("calls must be greater than 0") if self.period <= 0: raise ValueError("period must be greater than 0") if self._burst is not None and self._burst < self.calls: raise ValueError("burst must be greater than or equal to calls") # If burst is not specified, use calls as the burst limit if self._burst is None: self._burst = self.calls @property def burst(self) -> int: """Burst limit is always an int after initialization.""" assert self._burst is not None return self._burst
[docs] class RateLimiter: """Rate limiter using token bucket algorithm with sliding window.""" def __init__(self, config: RateLimitConfig): """Initialize the rate limiter. :param config: Rate limit configuration """ self.config = config self._window: Deque[float] = deque() self._lock = Lock() self._last_check = time.monotonic() def _clean_old_timestamps(self) -> None: """Remove timestamps outside the current window.""" now = time.monotonic() while self._window and now - self._window[0] > self.config.period: self._window.popleft() def _get_delay(self) -> float: """Calculate the required delay before the next request. :return: Required delay in seconds """ now = time.monotonic() self._clean_old_timestamps() if len(self._window) < self.config.burst: return 0.0 # Calculate delay based on the period and number of requests beyond burst requests_over_burst = max(0, len(self._window) - self.config.calls) if requests_over_burst > 0: # Calculate delay that distributes requests evenly over the period next_available = self._window[0] + ( self.config.period * requests_over_burst / self.config.calls ) return max(0.0, next_available - now) # Default delay when at burst limit return max(0.0, self._window[0] + self.config.period - now)
[docs] def acquire(self) -> float: """Acquire permission to make a request. :return: Time spent waiting (in seconds) """ with self._lock: # Check if we need to delay delay = self._get_delay() # Add the new timestamp BEFORE waiting now = time.monotonic() self._window.append(now) return delay
[docs] def reset(self) -> None: """Reset the rate limiter state.""" with self._lock: self._window.clear() self._last_check = time.monotonic()
@property def current_usage(self) -> int: """Get the current number of requests in the window.""" with self._lock: self._clean_old_timestamps() return len(self._window) @property def is_limited(self) -> bool: """Check if rate limit is currently being enforced.""" with self._lock: self._clean_old_timestamps() current_size = len(self._window) return bool( current_size >= self.config.burst or ( current_size >= self.config.calls and self._window and (time.monotonic() - self._window[0]) <= self.config.period ) )
[docs] def remaining_calls(self) -> int: """Get the number of remaining calls allowed in the current window. Based on the base rate limit (calls), not the burst limit. """ with self._lock: self._clean_old_timestamps() return max(0, self.config.calls - len(self._window))