Module directory_server.message_router

Message routing logic for forwarding messages between peers.

Implements Single Responsibility Principle: only handles message routing.

Classes

class MessageRouter (peer_registry: PeerRegistry,
send_callback: Callable[[str, bytes], Awaitable[None]],
broadcast_batch_size: int = 50,
on_send_failed: Callable[[str], Awaitable[None]] | None = None)
Expand source code
class MessageRouter:
    def __init__(
        self,
        peer_registry: PeerRegistry,
        send_callback: SendCallback,
        broadcast_batch_size: int = DEFAULT_BROADCAST_BATCH_SIZE,
        on_send_failed: FailedSendCallback | None = None,
    ):
        self.peer_registry = peer_registry
        self.send_callback = send_callback
        self.broadcast_batch_size = broadcast_batch_size
        self.on_send_failed = on_send_failed
        # Track peers that failed during current operation to avoid repeated attempts
        self._failed_peers: set[str] = set()
        # Track offers per peer (peer_key -> set of order IDs)
        self._peer_offers: dict[str, set[str]] = {}

    async def route_message(self, envelope: MessageEnvelope, from_key: str) -> None:
        if envelope.message_type == MessageType.PUBMSG:
            await self._handle_public_message(envelope, from_key)
        elif envelope.message_type == MessageType.PRIVMSG:
            await self._handle_private_message(envelope, from_key)
        elif envelope.message_type == MessageType.GETPEERLIST:
            await self._handle_peerlist_request(from_key)
        elif envelope.message_type == MessageType.PING:
            await self._handle_ping(from_key)
        else:
            logger.debug(f"Unhandled message type: {envelope.message_type}")

    async def _handle_public_message(self, envelope: MessageEnvelope, from_key: str) -> None:
        parsed = parse_jm_message(envelope.payload)
        if not parsed:
            logger.warning("Invalid public message format")
            return

        from_nick, to_nick, rest = parsed
        if to_nick != "PUBLIC":
            logger.warning(f"Public message not addressed to PUBLIC: {to_nick}")
            return

        from_peer = self.peer_registry.get_by_key(from_key)
        if not from_peer:
            logger.warning(f"Unknown peer sending public message: {from_key}")
            return

        # Track offers (absorder, absoffer, reloffer, relorder)
        if rest:
            message_parts = rest.split()
            if (
                message_parts
                and message_parts[0]
                in (
                    "!absorder",
                    "!absoffer",
                    "!reloffer",
                    "!relorder",
                    "sw0absorder",
                    "sw0absoffer",
                    "sw0reloffer",
                    "sw0relorder",
                )
                and len(message_parts) >= 2
            ):
                # Extract order ID (second field in offer messages)
                try:
                    order_id = message_parts[1]
                    if from_key not in self._peer_offers:
                        self._peer_offers[from_key] = set()
                    self._peer_offers[from_key].add(order_id)
                    logger.trace(
                        f"Tracked offer {order_id} from {from_nick} "
                        f"(total offers: {len(self._peer_offers[from_key])})"
                    )
                except (ValueError, IndexError):
                    pass

        # Pre-serialize envelope once instead of per-peer
        envelope_bytes = envelope.to_bytes()

        # Use generator to avoid building full target list in memory
        def target_generator() -> Iterator[tuple[str, str | None]]:
            for peer in self.peer_registry.iter_connected(from_peer.network):
                peer_key = (
                    peer.nick
                    if peer.location_string == "NOT-SERVING-ONION"
                    else peer.location_string
                )
                if peer_key != from_key:
                    yield (peer_key, peer.nick)

        # Execute sends in batches to limit memory usage
        sent_count = await self._batched_broadcast_iter(target_generator(), envelope_bytes)

        logger.trace(f"Broadcasted public message from {from_nick} to {sent_count} peers")

    async def _safe_send(self, peer_key: str, data: bytes, nick: str | None = None) -> None:
        """Send with exception handling to prevent one failed send from affecting others."""
        # Skip if this peer already failed in current operation
        if peer_key in self._failed_peers:
            return

        try:
            await self.send_callback(peer_key, data)
        except Exception as e:
            logger.warning(f"Failed to send to {nick or peer_key}: {e}")
            # Mark peer as failed to prevent repeated attempts
            self._failed_peers.add(peer_key)
            # Notify server to clean up this peer
            if self.on_send_failed:
                try:
                    await self.on_send_failed(peer_key)
                except Exception as cleanup_err:
                    logger.trace(f"Error in on_send_failed callback: {cleanup_err}")

    async def _batched_broadcast(self, targets: list[tuple[str, str | None]], data: bytes) -> int:
        """
        Broadcast data to targets in batches to limit memory usage.

        Instead of creating all coroutines at once (which caused 2GB+ memory usage),
        we process in batches of broadcast_batch_size to keep memory bounded.

        Returns the number of targets processed.
        """
        return await self._batched_broadcast_iter(iter(targets), data)

    async def _batched_broadcast_iter(
        self, targets: Iterator[tuple[str, str | None]], data: bytes
    ) -> int:
        """
        Broadcast data to targets from an iterator in batches.

        This is the memory-efficient version that consumes targets lazily,
        only materializing batch_size items at a time.

        Returns the number of targets processed.
        """
        # Clear failed peers set at start of broadcast to allow fresh attempts
        # while still preventing repeated attempts within this broadcast
        self._failed_peers.clear()

        total_sent = 0
        batch: list[tuple[str, str | None]] = []

        for target in targets:
            peer_key, nick = target
            # Skip peers that have already failed in this broadcast
            if peer_key in self._failed_peers:
                continue
            batch.append(target)

            if len(batch) >= self.broadcast_batch_size:
                tasks = [self._safe_send(pk, data, n) for pk, n in batch]
                await asyncio.gather(*tasks)
                total_sent += len(batch)
                batch = []

        # Process remaining items
        if batch:
            tasks = [self._safe_send(pk, data, n) for pk, n in batch]
            await asyncio.gather(*tasks)
            total_sent += len(batch)

        return total_sent

    async def _handle_private_message(self, envelope: MessageEnvelope, from_key: str) -> None:
        parsed = parse_jm_message(envelope.payload)
        if not parsed:
            logger.warning("Invalid private message format")
            return

        from_nick, to_nick, rest = parsed
        logger.info(f"PRIVMSG routing: {from_nick} -> {to_nick} (rest: {rest[:50]}...)")

        to_peer = self.peer_registry.get_by_nick(to_nick)
        if not to_peer:
            logger.warning(f"Target peer not found: {to_nick}")
            # Log all registered peers for debugging
            all_peers = list(self.peer_registry._peers.keys())
            logger.info(f"Registered peer keys: {all_peers}")
            nick_map = dict(self.peer_registry._nick_to_key)
            logger.info(f"Nick to key map: {nick_map}")
            return

        from_peer = self.peer_registry.get_by_key(from_key)
        if not from_peer or from_peer.network != to_peer.network:
            logger.warning("Network mismatch or unknown sender")
            return

        try:
            to_peer_key = (
                to_peer.nick
                if to_peer.location_string == "NOT-SERVING-ONION"
                else to_peer.location_string
            )
            logger.info(f"Sending to peer_key: {to_peer_key}")
            await self.send_callback(to_peer_key, envelope.to_bytes())
            logger.info(f"Successfully routed private message: {from_nick} -> {to_nick}")

            await self._send_peer_location(to_peer_key, from_peer)
        except Exception as e:
            logger.warning(f"Failed to route private message to {to_nick}: {e}")
            # Notify server to clean up this peer's mapping
            if self.on_send_failed:
                to_peer_key = (
                    to_peer.nick
                    if to_peer.location_string == "NOT-SERVING-ONION"
                    else to_peer.location_string
                )
                with contextlib.suppress(Exception):
                    await self.on_send_failed(to_peer_key)

    async def _handle_peerlist_request(self, from_key: str) -> None:
        peer = self.peer_registry.get_by_key(from_key)
        if not peer:
            return

        # Check if requesting peer supports peerlist_features
        include_features = peer.features.get("peerlist_features", False)
        await self.send_peerlist(from_key, peer.network, include_features=include_features)

    async def _handle_ping(self, from_key: str) -> None:
        pong_envelope = MessageEnvelope(message_type=MessageType.PONG, payload="")
        try:
            await self.send_callback(from_key, pong_envelope.to_bytes())
            logger.trace(f"Sent PONG to {from_key}")
        except Exception as e:
            logger.trace(f"Failed to send PONG: {e}")

    async def send_peerlist(
        self, to_key: str, network: NetworkType, include_features: bool = False
    ) -> None:
        """
        Send peerlist to a peer.

        Args:
            to_key: Key of the peer to send to
            network: Network to filter peers by
            include_features: If True, include F: suffix with features for each peer.
                             This is enabled when the requesting peer supports peerlist_features.
        """
        logger.trace(
            f"send_peerlist called for {to_key}, network={network}, "
            f"include_features={include_features}"
        )

        # Always send a response, even if empty - clients wait for PEERLIST response
        if include_features:
            peers_with_features = self.peer_registry.get_peerlist_with_features(network)
            if not peers_with_features:
                peerlist_msg = ""
            else:
                entries = [
                    create_peerlist_entry(nick, loc, features=features)
                    for nick, loc, features in peers_with_features
                ]
                peerlist_msg = ",".join(entries)
        else:
            peers = self.peer_registry.get_peerlist_for_network(network)
            if not peers:
                peerlist_msg = ""
            else:
                entries = [create_peerlist_entry(nick, loc) for nick, loc in peers]
                peerlist_msg = ",".join(entries)

        envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload=peerlist_msg)

        try:
            await self.send_callback(to_key, envelope.to_bytes())
            logger.trace(f"Sent peerlist to {to_key} (include_features={include_features})")
        except Exception as e:
            logger.warning(f"Failed to send peerlist to {to_key}: {e}")

    async def _send_peer_location(self, to_location: str, peer_info: PeerInfo) -> None:
        if peer_info.onion_address == "NOT-SERVING-ONION":
            return

        entry = create_peerlist_entry(peer_info.nick, peer_info.location_string)
        envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload=entry)

        try:
            await self.send_callback(to_location, envelope.to_bytes())
        except Exception as e:
            logger.trace(f"Failed to send peer location: {e}")

    async def broadcast_peer_disconnect(self, peer_location: str, network: NetworkType) -> None:
        peer = self.peer_registry.get_by_location(peer_location)
        if not peer or not peer.nick:
            return

        entry = create_peerlist_entry(peer.nick, peer.location_string, disconnected=True)
        envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload=entry)

        # Pre-serialize envelope once instead of per-peer
        envelope_bytes = envelope.to_bytes()

        # Use generator to avoid building full target list in memory
        def target_generator() -> Iterator[tuple[str, str | None]]:
            for p in self.peer_registry.iter_connected(network):
                if p.location_string == peer_location:
                    continue
                peer_key = p.nick if p.location_string == "NOT-SERVING-ONION" else p.location_string
                yield (peer_key, p.nick)

        # Execute sends in batches to limit memory usage
        sent_count = await self._batched_broadcast_iter(target_generator(), envelope_bytes)

        logger.info(f"Broadcasted disconnect for {peer.nick} to {sent_count} peers")

    def get_offer_stats(self) -> dict:
        """Get statistics about tracked offers."""
        total_offers = sum(len(offers) for offers in self._peer_offers.values())
        peers_with_offers = len([k for k, v in self._peer_offers.items() if v])

        # Find peers with more than 2 offers
        peers_many_offers = []
        for peer_key, offers in self._peer_offers.items():
            if len(offers) > 2:
                peer_info = self.peer_registry.get_by_key(peer_key)
                nick = peer_info.nick if peer_info else peer_key
                peers_many_offers.append((nick, len(offers)))

        # Sort by offer count descending
        peers_many_offers.sort(key=lambda x: x[1], reverse=True)

        return {
            "total_offers": total_offers,
            "peers_with_offers": peers_with_offers,
            "peers_many_offers": peers_many_offers[:10],  # Top 10
        }

    def remove_peer_offers(self, peer_key: str) -> None:
        """Remove offer tracking for a disconnected peer."""
        self._peer_offers.pop(peer_key, None)

Methods

async def broadcast_peer_disconnect(self,
peer_location: str,
network: NetworkType) ‑> None
Expand source code
async def broadcast_peer_disconnect(self, peer_location: str, network: NetworkType) -> None:
    peer = self.peer_registry.get_by_location(peer_location)
    if not peer or not peer.nick:
        return

    entry = create_peerlist_entry(peer.nick, peer.location_string, disconnected=True)
    envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload=entry)

    # Pre-serialize envelope once instead of per-peer
    envelope_bytes = envelope.to_bytes()

    # Use generator to avoid building full target list in memory
    def target_generator() -> Iterator[tuple[str, str | None]]:
        for p in self.peer_registry.iter_connected(network):
            if p.location_string == peer_location:
                continue
            peer_key = p.nick if p.location_string == "NOT-SERVING-ONION" else p.location_string
            yield (peer_key, p.nick)

    # Execute sends in batches to limit memory usage
    sent_count = await self._batched_broadcast_iter(target_generator(), envelope_bytes)

    logger.info(f"Broadcasted disconnect for {peer.nick} to {sent_count} peers")
def get_offer_stats(self) ‑> dict
Expand source code
def get_offer_stats(self) -> dict:
    """Get statistics about tracked offers."""
    total_offers = sum(len(offers) for offers in self._peer_offers.values())
    peers_with_offers = len([k for k, v in self._peer_offers.items() if v])

    # Find peers with more than 2 offers
    peers_many_offers = []
    for peer_key, offers in self._peer_offers.items():
        if len(offers) > 2:
            peer_info = self.peer_registry.get_by_key(peer_key)
            nick = peer_info.nick if peer_info else peer_key
            peers_many_offers.append((nick, len(offers)))

    # Sort by offer count descending
    peers_many_offers.sort(key=lambda x: x[1], reverse=True)

    return {
        "total_offers": total_offers,
        "peers_with_offers": peers_with_offers,
        "peers_many_offers": peers_many_offers[:10],  # Top 10
    }

Get statistics about tracked offers.

def remove_peer_offers(self, peer_key: str) ‑> None
Expand source code
def remove_peer_offers(self, peer_key: str) -> None:
    """Remove offer tracking for a disconnected peer."""
    self._peer_offers.pop(peer_key, None)

Remove offer tracking for a disconnected peer.

async def route_message(self,
envelope: MessageEnvelope,
from_key: str) ‑> None
Expand source code
async def route_message(self, envelope: MessageEnvelope, from_key: str) -> None:
    if envelope.message_type == MessageType.PUBMSG:
        await self._handle_public_message(envelope, from_key)
    elif envelope.message_type == MessageType.PRIVMSG:
        await self._handle_private_message(envelope, from_key)
    elif envelope.message_type == MessageType.GETPEERLIST:
        await self._handle_peerlist_request(from_key)
    elif envelope.message_type == MessageType.PING:
        await self._handle_ping(from_key)
    else:
        logger.debug(f"Unhandled message type: {envelope.message_type}")
async def send_peerlist(self,
to_key: str,
network: NetworkType,
include_features: bool = False) ‑> None
Expand source code
async def send_peerlist(
    self, to_key: str, network: NetworkType, include_features: bool = False
) -> None:
    """
    Send peerlist to a peer.

    Args:
        to_key: Key of the peer to send to
        network: Network to filter peers by
        include_features: If True, include F: suffix with features for each peer.
                         This is enabled when the requesting peer supports peerlist_features.
    """
    logger.trace(
        f"send_peerlist called for {to_key}, network={network}, "
        f"include_features={include_features}"
    )

    # Always send a response, even if empty - clients wait for PEERLIST response
    if include_features:
        peers_with_features = self.peer_registry.get_peerlist_with_features(network)
        if not peers_with_features:
            peerlist_msg = ""
        else:
            entries = [
                create_peerlist_entry(nick, loc, features=features)
                for nick, loc, features in peers_with_features
            ]
            peerlist_msg = ",".join(entries)
    else:
        peers = self.peer_registry.get_peerlist_for_network(network)
        if not peers:
            peerlist_msg = ""
        else:
            entries = [create_peerlist_entry(nick, loc) for nick, loc in peers]
            peerlist_msg = ",".join(entries)

    envelope = MessageEnvelope(message_type=MessageType.PEERLIST, payload=peerlist_msg)

    try:
        await self.send_callback(to_key, envelope.to_bytes())
        logger.trace(f"Sent peerlist to {to_key} (include_features={include_features})")
    except Exception as e:
        logger.warning(f"Failed to send peerlist to {to_key}: {e}")

Send peerlist to a peer.

Args

to_key
Key of the peer to send to
network
Network to filter peers by
include_features
If True, include F: suffix with features for each peer. This is enabled when the requesting peer supports peerlist_features.