Module jmcore.rate_limiter

Per-peer rate limiting using token bucket algorithm.

Prevents DoS attacks by limiting the message rate from each connected peer. This module provides a generic rate limiter that can be used across all JoinMarket components (directory server, makers, takers).

Design: - Token bucket algorithm allows generous burst capacity with low sustained rate - Default: 10 msg/sec sustained, 100 msg burst (allows ~10s of max-rate traffic) - Per-peer tracking prevents one bad actor from affecting others - Automatic cleanup prevents memory leaks

Security considerations: - Rate limiting should be keyed by connection ID, not self-declared nick - Nick-based tracking is vulnerable to impersonation attacks - A malicious peer could claim another's nick and trigger rate limits

Classes

class RateLimitAction (*values)
Expand source code
class RateLimitAction(Enum):
    """Action to take when rate limit is exceeded."""

    ALLOW = "allow"  # Message is allowed
    DELAY = "delay"  # Message should be delayed/dropped but connection stays
    DISCONNECT = "disconnect"  # Disconnect the peer (severe abuse)

Action to take when rate limit is exceeded.

Ancestors

  • enum.Enum

Class variables

var ALLOW

The type of the None singleton.

var DELAY

The type of the None singleton.

var DISCONNECT

The type of the None singleton.

class RateLimiter (rate_limit: int = 10,
burst_limit: int | None = None,
disconnect_threshold: int | None = None)
Expand source code
class RateLimiter:
    """
    Per-peer rate limiter using token bucket algorithm.

    Configuration:
    - rate_limit: messages per second (sustained rate)
    - burst_limit: maximum burst size (default: 10x rate_limit)
    - disconnect_threshold: violations before disconnect (default: None = never)

    Default settings (10 msg/sec sustained, 100 msg burst):
    - Allows ~10 seconds of continuous max-rate traffic before throttling
    - Prevents DoS from rapid spam while allowing legitimate burst patterns
    - Example: taker requesting orderbook from multiple makers simultaneously

    Security:
    - Rate limit by connection ID, not self-declared nick, to prevent impersonation
    - Nick spoofing attack: attacker claims victim's nick to get them rate limited
    - Use connection-based keys until identity is cryptographically verified
    """

    @validate_call
    def __init__(
        self,
        rate_limit: int = 10,
        burst_limit: int | None = None,
        disconnect_threshold: int | None = None,
    ):
        """
        Initialize rate limiter.

        Args:
            rate_limit: Maximum messages per second (sustained, default: 10)
            burst_limit: Maximum burst size (default: 10x rate_limit = 100)
            disconnect_threshold: Violations before disconnect (None = never disconnect)
        """
        self.rate_limit = rate_limit
        self.burst_limit = burst_limit or (rate_limit * 10)
        self.disconnect_threshold = disconnect_threshold
        self._buckets: dict[str, TokenBucket] = {}
        self._violation_counts: dict[str, int] = {}

    def check(self, peer_key: str) -> tuple[RateLimitAction, float]:
        """
        Check rate limit and return recommended action.

        Returns:
            Tuple of (action, delay_seconds):
            - ALLOW: Message allowed, delay=0
            - DELAY: Message should be delayed/dropped, delay=recommended wait time
            - DISCONNECT: Peer should be disconnected (severe abuse), delay=0
        """
        if peer_key not in self._buckets:
            self._buckets[peer_key] = TokenBucket(
                capacity=self.burst_limit,
                refill_rate=float(self.rate_limit),
            )

        bucket = self._buckets[peer_key]
        allowed = bucket.consume()

        if allowed:
            return (RateLimitAction.ALLOW, 0.0)

        # Rate limited - increment violation count
        self._violation_counts[peer_key] = self._violation_counts.get(peer_key, 0) + 1
        violations = self._violation_counts[peer_key]

        # Check if we should disconnect (only if threshold is set)
        if self.disconnect_threshold is not None and violations >= self.disconnect_threshold:
            return (RateLimitAction.DISCONNECT, 0.0)

        # Otherwise, recommend delay
        delay = bucket.get_delay_seconds()
        return (RateLimitAction.DELAY, delay)

    def remove_peer(self, peer_key: str) -> None:
        """Remove rate limit state for a disconnected peer."""
        self._buckets.pop(peer_key, None)
        self._violation_counts.pop(peer_key, None)

    def get_violation_count(self, peer_key: str) -> int:
        """Get the number of rate limit violations for a peer."""
        return self._violation_counts.get(peer_key, 0)

    def get_delay_for_peer(self, peer_key: str) -> float:
        """Get recommended delay in seconds for a rate-limited peer."""
        bucket = self._buckets.get(peer_key)
        if bucket is None:
            return 0.0
        return bucket.get_delay_seconds()

    def cleanup_old_peers(self, max_idle_seconds: float = 3600.0) -> int:
        """
        Remove peers that haven't sent messages in max_idle_seconds.

        Returns the number of peers removed.
        """
        now = time.monotonic()
        stale_peers = [
            peer_key
            for peer_key, bucket in self._buckets.items()
            if now - bucket.last_refill > max_idle_seconds
        ]

        for peer_key in stale_peers:
            self.remove_peer(peer_key)

        return len(stale_peers)

    def get_stats(self) -> dict:
        """Get rate limiter statistics."""
        return {
            "tracked_peers": len(self._buckets),
            "total_violations": sum(self._violation_counts.values()),
            "top_violators": sorted(
                self._violation_counts.items(),
                key=lambda x: x[1],
                reverse=True,
            )[:10],
        }

    def clear(self) -> None:
        """Clear all rate limit state."""
        self._buckets.clear()
        self._violation_counts.clear()

Per-peer rate limiter using token bucket algorithm.

Configuration: - rate_limit: messages per second (sustained rate) - burst_limit: maximum burst size (default: 10x rate_limit) - disconnect_threshold: violations before disconnect (default: None = never)

Default settings (10 msg/sec sustained, 100 msg burst): - Allows ~10 seconds of continuous max-rate traffic before throttling - Prevents DoS from rapid spam while allowing legitimate burst patterns - Example: taker requesting orderbook from multiple makers simultaneously

Security: - Rate limit by connection ID, not self-declared nick, to prevent impersonation - Nick spoofing attack: attacker claims victim's nick to get them rate limited - Use connection-based keys until identity is cryptographically verified

Initialize rate limiter.

Args

rate_limit
Maximum messages per second (sustained, default: 10)
burst_limit
Maximum burst size (default: 10x rate_limit = 100)
disconnect_threshold
Violations before disconnect (None = never disconnect)

Methods

def check(self, peer_key: str) ‑> tuple[RateLimitAction, float]
Expand source code
def check(self, peer_key: str) -> tuple[RateLimitAction, float]:
    """
    Check rate limit and return recommended action.

    Returns:
        Tuple of (action, delay_seconds):
        - ALLOW: Message allowed, delay=0
        - DELAY: Message should be delayed/dropped, delay=recommended wait time
        - DISCONNECT: Peer should be disconnected (severe abuse), delay=0
    """
    if peer_key not in self._buckets:
        self._buckets[peer_key] = TokenBucket(
            capacity=self.burst_limit,
            refill_rate=float(self.rate_limit),
        )

    bucket = self._buckets[peer_key]
    allowed = bucket.consume()

    if allowed:
        return (RateLimitAction.ALLOW, 0.0)

    # Rate limited - increment violation count
    self._violation_counts[peer_key] = self._violation_counts.get(peer_key, 0) + 1
    violations = self._violation_counts[peer_key]

    # Check if we should disconnect (only if threshold is set)
    if self.disconnect_threshold is not None and violations >= self.disconnect_threshold:
        return (RateLimitAction.DISCONNECT, 0.0)

    # Otherwise, recommend delay
    delay = bucket.get_delay_seconds()
    return (RateLimitAction.DELAY, delay)

Check rate limit and return recommended action.

Returns

Tuple of (action, delay_seconds): - ALLOW: Message allowed, delay=0 - DELAY: Message should be delayed/dropped, delay=recommended wait time - DISCONNECT: Peer should be disconnected (severe abuse), delay=0

def cleanup_old_peers(self, max_idle_seconds: float = 3600.0) ‑> int
Expand source code
def cleanup_old_peers(self, max_idle_seconds: float = 3600.0) -> int:
    """
    Remove peers that haven't sent messages in max_idle_seconds.

    Returns the number of peers removed.
    """
    now = time.monotonic()
    stale_peers = [
        peer_key
        for peer_key, bucket in self._buckets.items()
        if now - bucket.last_refill > max_idle_seconds
    ]

    for peer_key in stale_peers:
        self.remove_peer(peer_key)

    return len(stale_peers)

Remove peers that haven't sent messages in max_idle_seconds.

Returns the number of peers removed.

def clear(self) ‑> None
Expand source code
def clear(self) -> None:
    """Clear all rate limit state."""
    self._buckets.clear()
    self._violation_counts.clear()

Clear all rate limit state.

def get_delay_for_peer(self, peer_key: str) ‑> float
Expand source code
def get_delay_for_peer(self, peer_key: str) -> float:
    """Get recommended delay in seconds for a rate-limited peer."""
    bucket = self._buckets.get(peer_key)
    if bucket is None:
        return 0.0
    return bucket.get_delay_seconds()

Get recommended delay in seconds for a rate-limited peer.

def get_stats(self) ‑> dict
Expand source code
def get_stats(self) -> dict:
    """Get rate limiter statistics."""
    return {
        "tracked_peers": len(self._buckets),
        "total_violations": sum(self._violation_counts.values()),
        "top_violators": sorted(
            self._violation_counts.items(),
            key=lambda x: x[1],
            reverse=True,
        )[:10],
    }

Get rate limiter statistics.

def get_violation_count(self, peer_key: str) ‑> int
Expand source code
def get_violation_count(self, peer_key: str) -> int:
    """Get the number of rate limit violations for a peer."""
    return self._violation_counts.get(peer_key, 0)

Get the number of rate limit violations for a peer.

def remove_peer(self, peer_key: str) ‑> None
Expand source code
def remove_peer(self, peer_key: str) -> None:
    """Remove rate limit state for a disconnected peer."""
    self._buckets.pop(peer_key, None)
    self._violation_counts.pop(peer_key, None)

Remove rate limit state for a disconnected peer.

class TokenBucket (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class TokenBucket:
    """
    Token bucket for rate limiting.

    Tokens are added at a fixed rate up to a maximum capacity.
    Each message consumes one token. If no tokens are available,
    the message is rejected.
    """

    capacity: int  # Maximum tokens (burst allowance)
    refill_rate: float  # Tokens per second
    tokens: float = Field(init=False)
    last_refill: float = Field(init=False)

    def __post_init__(self) -> None:
        self.tokens = float(self.capacity)
        self.last_refill = time.monotonic()

    def consume(self, tokens: int = 1) -> bool:
        """
        Try to consume tokens. Returns True if successful, False if rate limited.
        """
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.last_refill = now

        # Refill tokens based on elapsed time
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)

        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

    def get_delay_seconds(self) -> float:
        """
        Get recommended delay in seconds before next message would be allowed.
        Returns 0 if tokens are available.
        """
        if self.tokens >= 1:
            return 0.0
        # Calculate time needed to refill 1 token
        tokens_needed = 1 - self.tokens
        return tokens_needed / self.refill_rate

    def reset(self) -> None:
        """Reset bucket to full capacity."""
        self.tokens = float(self.capacity)
        self.last_refill = time.monotonic()

Token bucket for rate limiting.

Tokens are added at a fixed rate up to a maximum capacity. Each message consumes one token. If no tokens are available, the message is rejected.

Instance variables

var capacity : int

The type of the None singleton.

var last_refill : float

The type of the None singleton.

var refill_rate : float

The type of the None singleton.

var tokens : float

The type of the None singleton.

Methods

def consume(self, tokens: int = 1) ‑> bool
Expand source code
def consume(self, tokens: int = 1) -> bool:
    """
    Try to consume tokens. Returns True if successful, False if rate limited.
    """
    now = time.monotonic()
    elapsed = now - self.last_refill
    self.last_refill = now

    # Refill tokens based on elapsed time
    self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)

    if self.tokens >= tokens:
        self.tokens -= tokens
        return True
    return False

Try to consume tokens. Returns True if successful, False if rate limited.

def get_delay_seconds(self) ‑> float
Expand source code
def get_delay_seconds(self) -> float:
    """
    Get recommended delay in seconds before next message would be allowed.
    Returns 0 if tokens are available.
    """
    if self.tokens >= 1:
        return 0.0
    # Calculate time needed to refill 1 token
    tokens_needed = 1 - self.tokens
    return tokens_needed / self.refill_rate

Get recommended delay in seconds before next message would be allowed. Returns 0 if tokens are available.

def reset(self) ‑> None
Expand source code
def reset(self) -> None:
    """Reset bucket to full capacity."""
    self.tokens = float(self.capacity)
    self.last_refill = time.monotonic()

Reset bucket to full capacity.