Module jmcore.directory_client

Shared DirectoryClient for connecting to JoinMarket directory nodes.

This module provides a unified client for: - Orderbook watcher (passive monitoring) - Maker (announcing offers) - Taker (fetching orderbooks and coordinating CoinJoins)

Functions

def parse_fidelity_bond_proof(proof_base64: str, maker_nick: str, taker_nick: str, verify: bool = True) ‑> dict[str, typing.Any] | None
Expand source code
def parse_fidelity_bond_proof(
    proof_base64: str, maker_nick: str, taker_nick: str, verify: bool = True
) -> dict[str, Any] | None:
    """
    Parse and optionally verify a fidelity bond proof from base64-encoded binary data.

    Args:
        proof_base64: Base64-encoded bond proof
        maker_nick: Maker's nick
        taker_nick: Taker's nick (requesting party)
        verify: If True, verify both signatures in the proof (default: True)

    Returns:
        Dict with bond details or None if parsing/verification fails
    """
    # First, verify the signatures if requested
    if verify:
        is_valid, verified_data, error = verify_fidelity_bond_proof(
            proof_base64, maker_nick, taker_nick
        )
        if not is_valid:
            logger.warning(f"Fidelity bond proof verification failed for {maker_nick}: {error}")
            return None

    # Parse the proof data (also extracts redeem script)
    try:
        decoded_data = base64.b64decode(proof_base64)
    except (binascii.Error, ValueError) as e:
        logger.warning(f"Failed to decode bond proof: {e}")
        return None

    if len(decoded_data) != 252:
        logger.warning(f"Invalid bond proof length: {len(decoded_data)}, expected 252")
        return None

    try:
        unpacked_data = struct.unpack("<72s72s33sH33s32sII", decoded_data)

        txid = unpacked_data[5]
        vout = unpacked_data[6]
        locktime = unpacked_data[7]
        utxo_pub = unpacked_data[4]
        cert_pub = unpacked_data[2]
        cert_expiry_raw = unpacked_data[3]
        cert_expiry = cert_expiry_raw * 2016

        utxo_pub_hex = binascii.hexlify(utxo_pub).decode("ascii")
        redeem_script = mk_freeze_script(utxo_pub_hex, locktime)
        redeem_script_hex = binascii.hexlify(redeem_script).decode("ascii")
        p2wsh_script = redeem_script_to_p2wsh_script(redeem_script)
        p2wsh_script_hex = binascii.hexlify(p2wsh_script).decode("ascii")

        return {
            "maker_nick": maker_nick,
            "taker_nick": taker_nick,
            "utxo_txid": binascii.hexlify(txid).decode("ascii"),
            "utxo_vout": vout,
            "locktime": locktime,
            "utxo_pub": utxo_pub_hex,
            "cert_pub": binascii.hexlify(cert_pub).decode("ascii"),
            "cert_expiry": cert_expiry,
            "proof": proof_base64,
            "redeem_script": redeem_script_hex,
            "p2wsh_script": p2wsh_script_hex,
        }
    except Exception as e:
        logger.warning(f"Failed to unpack bond proof: {e}")
        return None

Parse and optionally verify a fidelity bond proof from base64-encoded binary data.

Args

proof_base64
Base64-encoded bond proof
maker_nick
Maker's nick
taker_nick
Taker's nick (requesting party)
verify
If True, verify both signatures in the proof (default: True)

Returns

Dict with bond details or None if parsing/verification fails

Classes

class DirectoryClient (host: str,
port: int,
network: str,
nick_identity: NickIdentity | None = None,
location: str = 'NOT-SERVING-ONION',
socks_host: str = '127.0.0.1',
socks_port: int = 9050,
timeout: float = 30.0,
max_message_size: int = 2097152,
on_disconnect: Callable[[], None] | None = None,
neutrino_compat: bool = False,
peerlist_timeout: float = 60.0)
Expand source code
class DirectoryClient:
    """
    Client for connecting to JoinMarket directory servers.

    Supports:
    - Direct TCP connections (for local/dev)
    - Tor connections (for .onion addresses)
    - Handshake protocol
    - Peerlist fetching
    - Orderbook fetching
    - Continuous listening for updates
    """

    def __init__(
        self,
        host: str,
        port: int,
        network: str,
        nick_identity: NickIdentity | None = None,
        location: str = "NOT-SERVING-ONION",
        socks_host: str = "127.0.0.1",
        socks_port: int = 9050,
        timeout: float = 30.0,
        max_message_size: int = 2097152,
        on_disconnect: Callable[[], None] | None = None,
        neutrino_compat: bool = False,
        peerlist_timeout: float = 60.0,
    ) -> None:
        """
        Initialize DirectoryClient.

        Args:
            host: Directory server hostname or .onion address
            port: Directory server port
            network: Bitcoin network (mainnet, testnet, signet, regtest)
            nick_identity: NickIdentity for message signing (generated if None)
            location: Our location string (onion address or NOT-SERVING-ONION)
            socks_host: SOCKS proxy host for Tor
            socks_port: SOCKS proxy port for Tor
            timeout: Connection timeout in seconds
            max_message_size: Maximum message size in bytes
            on_disconnect: Callback when connection drops
            neutrino_compat: Advertise support for Neutrino-compatible UTXO metadata
            peerlist_timeout: Timeout for first PEERLIST chunk (default 60s, subsequent chunks use 5s)
        """
        self.host = host
        self.port = port
        self.network = network
        self.location = location
        self.socks_host = socks_host
        self.socks_port = socks_port
        self.timeout = timeout
        self.max_message_size = max_message_size
        self.connection: TCPConnection | None = None
        self.nick_identity = nick_identity or NickIdentity(JM_VERSION)
        self.nick = self.nick_identity.nick
        # hostid retained for possible future use (e.g., logging, debugging)
        # Note: NOT used for message signing - always use ONION_HOSTID constant instead
        self.hostid = host
        # Offers indexed by (counterparty, oid) with timestamp metadata
        self.offers: dict[tuple[str, int], OfferWithTimestamp] = {}
        # Bonds indexed by UTXO key (txid:vout)
        self.bonds: dict[str, FidelityBond] = {}
        # Reverse index: bond UTXO key -> set of (counterparty, oid) keys that use this bond
        # Used for deduplication when same bond is used by different nicks
        self._bond_to_offers: dict[str, set[tuple[str, int]]] = {}
        self.peer_features: dict[str, dict[str, bool]] = {}  # nick -> features dict
        # Active peers from last peerlist (nick -> location)
        self._active_peers: dict[str, str] = {}
        self.running = False
        self.on_disconnect = on_disconnect
        self.initial_orderbook_received = False
        self.last_orderbook_request_time: float = 0.0
        self.last_offer_received_time: float | None = None
        self.neutrino_compat = neutrino_compat

        # Version negotiation state (set after handshake)
        self.negotiated_version: int | None = None
        self.directory_neutrino_compat: bool = False
        self.directory_peerlist_features: bool = False  # True if directory supports F: suffix

        # Directory metadata from handshake
        self.directory_motd: str | None = None
        self.directory_nick: str | None = None
        self.directory_proto_ver_min: int | None = None
        self.directory_proto_ver_max: int | None = None
        self.directory_features: dict[str, bool] = {}

        # Timing intervals
        self.peerlist_check_interval = 1800.0
        self.orderbook_refresh_interval = 1800.0
        self.orderbook_retry_interval = 300.0
        self.zero_offer_retry_interval = 600.0

        # Peerlist support tracking
        # If the directory doesn't support getpeerlist (e.g., reference implementation),
        # we track this to avoid spamming unsupported requests
        self._peerlist_supported: bool | None = None  # None = unknown, True/False = known
        self._last_peerlist_request_time: float = 0.0
        self._peerlist_min_interval: float = 60.0  # Minimum seconds between peerlist requests
        self._peerlist_timeout: float = peerlist_timeout  # Timeout for first peerlist chunk
        self._peerlist_chunk_timeout: float = (
            5.0  # Timeout between chunks (end of chunked response)
        )
        self._peerlist_timeout_count: int = 0  # Track consecutive timeouts

        # Message buffer for messages received while waiting for specific responses
        # (e.g., PEERLIST). These messages should be processed, not discarded.
        self._message_buffer: asyncio.Queue[dict[str, Any]] = asyncio.Queue()

    async def connect(self) -> None:
        """Connect to the directory server and perform handshake."""
        try:
            logger.debug(f"DirectoryClient.connect: connecting to {self.host}:{self.port}")
            if not self.host.endswith(".onion"):
                self.connection = await connect_direct(
                    self.host,
                    self.port,
                    self.max_message_size,
                    self.timeout,
                )
                logger.debug("DirectoryClient.connect: direct connection established")
            else:
                self.connection = await connect_via_tor(
                    self.host,
                    self.port,
                    self.socks_host,
                    self.socks_port,
                    self.max_message_size,
                    self.timeout,
                )
                logger.debug("DirectoryClient.connect: tor connection established")
            logger.debug("DirectoryClient.connect: starting handshake")
            await self._handshake()
            logger.debug("DirectoryClient.connect: handshake complete")
        except Exception as e:
            logger.error(f"Failed to connect to {self.host}:{self.port}: {e}", exc_info=True)
            # Clean up connection if handshake failed
            if self.connection:
                with contextlib.suppress(Exception):
                    await self.connection.close()
                self.connection = None
            raise DirectoryClientError(f"Connection failed: {e}") from e

    async def _handshake(self) -> None:
        """
        Perform directory server handshake with feature negotiation.

        We use proto-ver=5 for reference implementation compatibility.
        Features like neutrino_compat are negotiated independently via
        the features dict in the handshake payload.
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        # Build our feature set - always include peerlist_features to indicate we support
        # the extended peerlist format with F: suffix for feature information
        our_features: set[str] = {FEATURE_PEERLIST_FEATURES}
        if self.neutrino_compat:
            our_features.add(FEATURE_NEUTRINO_COMPAT)
        feature_set = FeatureSet(features=our_features)

        # Send our handshake with current version and features
        handshake_data = create_handshake_request(
            nick=self.nick,
            location=self.location,
            network=self.network,
            directory=False,
            features=feature_set,
        )
        logger.debug(f"DirectoryClient._handshake: created handshake data: {handshake_data}")
        handshake_msg = {
            "type": MessageType.HANDSHAKE.value,
            "line": json.dumps(handshake_data),
        }
        logger.debug("DirectoryClient._handshake: sending handshake message")
        await self.connection.send(json.dumps(handshake_msg).encode("utf-8"))
        logger.debug("DirectoryClient._handshake: handshake sent, waiting for response")

        # Receive and parse directory's response
        response_data = await asyncio.wait_for(self.connection.receive(), timeout=self.timeout)
        logger.debug(f"DirectoryClient._handshake: received response: {response_data[:200]!r}")
        response = json.loads(response_data.decode("utf-8"))

        if response["type"] not in (MessageType.HANDSHAKE.value, MessageType.DN_HANDSHAKE.value):
            raise DirectoryClientError(f"Unexpected response type: {response['type']}")

        handshake_response = json.loads(response["line"])
        if not handshake_response.get("accepted", False):
            raise DirectoryClientError("Handshake rejected")

        # Extract directory's version range
        # Reference directories only send "proto-ver" (single value, typically 5)
        dir_ver_min = handshake_response.get("proto-ver-min")
        dir_ver_max = handshake_response.get("proto-ver-max")

        if dir_ver_min is None or dir_ver_max is None:
            # Reference directory: only sends single proto-ver
            dir_version = handshake_response.get("proto-ver", 5)
            dir_ver_min = dir_ver_max = dir_version

        # Verify compatibility with our version (we only support v5)
        if not (dir_ver_min <= JM_VERSION <= dir_ver_max):
            raise DirectoryClientError(
                f"No compatible protocol version: we support v{JM_VERSION}, "
                f"directory supports [{dir_ver_min}, {dir_ver_max}]"
            )

        # Use v5 (our only supported version)
        self.negotiated_version = JM_VERSION

        # Check if directory supports Neutrino-compatible metadata
        self.directory_neutrino_compat = peer_supports_neutrino_compat(handshake_response)

        # Check if directory supports peerlist_features (extended peerlist with F: suffix)
        dir_features = handshake_response.get("features", {})
        self.directory_peerlist_features = dir_features.get(FEATURE_PEERLIST_FEATURES, False)

        # Store directory metadata
        self.directory_motd = handshake_response.get("motd")
        self.directory_nick = handshake_response.get("nick")
        self.directory_proto_ver_min = dir_ver_min
        self.directory_proto_ver_max = dir_ver_max
        self.directory_features = dir_features

        logger.info(
            f"Handshake successful with {self.host}:{self.port} (nick: {self.nick}, "
            f"negotiated_version: v{self.negotiated_version}, "
            f"neutrino_compat: {self.directory_neutrino_compat}, "
            f"peerlist_features: {self.directory_peerlist_features})"
        )

    async def get_peerlist(self) -> list[str] | None:
        """
        Fetch the current list of connected peers.

        Note: Reference implementation directories do NOT support GETPEERLIST.
        This method shares peerlist support tracking with get_peerlist_with_features().

        The directory may send multiple PEERLIST messages (chunked response) to
        avoid overwhelming slow Tor connections. This method accumulates peers
        from all chunks.

        Returns:
            List of active peer nicks. Returns empty list if directory doesn't
            support GETPEERLIST. Returns None if rate-limited (use cached data).
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        # Skip if we already know this directory doesn't support GETPEERLIST
        # (only applies to directories that didn't announce peerlist_features)
        if self._peerlist_supported is False and not self.directory_peerlist_features:
            logger.debug("Skipping GETPEERLIST - directory doesn't support it")
            return []

        # Rate-limit peerlist requests to avoid spamming
        import time

        current_time = time.time()
        if current_time - self._last_peerlist_request_time < self._peerlist_min_interval:
            logger.debug(
                f"Skipping GETPEERLIST - rate limited "
                f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)"
            )
            return None

        self._last_peerlist_request_time = current_time

        getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""}
        logger.debug("Sending GETPEERLIST request")
        await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8"))

        start_time = asyncio.get_event_loop().time()

        # Timeout for waiting for the first PEERLIST response
        first_response_timeout = (
            self._peerlist_timeout if self.directory_peerlist_features else self.timeout
        )

        # Timeout between chunks - when this expires after receiving at least one
        # PEERLIST message, we know the directory has finished sending all chunks
        inter_chunk_timeout = self._peerlist_chunk_timeout

        # Accumulate peers from multiple PEERLIST chunks
        all_peers: list[str] = []
        chunks_received = 0
        got_first_response = False

        while True:
            elapsed = asyncio.get_event_loop().time() - start_time

            # Determine timeout for this receive
            if not got_first_response:
                remaining = first_response_timeout - elapsed
                if remaining <= 0:
                    self._handle_peerlist_timeout()
                    return []
                receive_timeout = remaining
            else:
                receive_timeout = inter_chunk_timeout

            try:
                response_data = await asyncio.wait_for(
                    self.connection.receive(), timeout=receive_timeout
                )
                response = json.loads(response_data.decode("utf-8"))
                msg_type = response.get("type")

                if msg_type == MessageType.PEERLIST.value:
                    got_first_response = True
                    chunks_received += 1
                    peerlist_str = response.get("line", "")

                    # Parse this chunk
                    chunk_peers: list[str] = []
                    if peerlist_str:
                        for entry in peerlist_str.split(","):
                            if not entry or not entry.strip():
                                continue
                            if NICK_PEERLOCATOR_SEPARATOR not in entry:
                                logger.debug(f"Skipping metadata entry in peerlist: '{entry}'")
                                continue
                            try:
                                nick, location, disconnected, _features = parse_peerlist_entry(
                                    entry
                                )
                                logger.debug(
                                    f"Parsed peer: {nick} at {location}, "
                                    f"disconnected={disconnected}"
                                )
                                if not disconnected:
                                    chunk_peers.append(nick)
                            except ValueError as e:
                                logger.warning(f"Failed to parse peerlist entry '{entry}': {e}")
                                continue

                    all_peers.extend(chunk_peers)
                    logger.debug(
                        f"Received PEERLIST chunk {chunks_received} with "
                        f"{len(chunk_peers)} peers (total: {len(all_peers)})"
                    )
                    continue

                # Buffer unexpected messages
                logger.trace(
                    f"Buffering unexpected message type {msg_type} while waiting for PEERLIST"
                )
                await self._message_buffer.put(response)

            except TimeoutError:
                if not got_first_response:
                    self._handle_peerlist_timeout()
                    return []
                # Inter-chunk timeout means we're done
                break

            except Exception as e:
                logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}")
                elapsed = asyncio.get_event_loop().time() - start_time
                if not got_first_response and elapsed > first_response_timeout:
                    self._handle_peerlist_timeout()
                    return []
                if got_first_response:
                    break

        # Mark peerlist as supported since we got a valid response
        self._peerlist_supported = True
        self._peerlist_timeout_count = 0

        logger.info(f"Received {len(all_peers)} active peers from {self.host}:{self.port}")
        return all_peers

    async def get_peerlist_with_features(self) -> list[tuple[str, str, FeatureSet]]:
        """
        Fetch the current list of connected peers with their features.

        Uses the standard GETPEERLIST message. If the directory supports
        peerlist_features, the response will include F: suffix with features.

        Note: Reference implementation directories do NOT support GETPEERLIST.
        This method tracks whether the directory supports it and skips requests
        to unsupported directories to avoid spamming warnings in their logs.

        The directory may send multiple PEERLIST messages (chunked response) to
        avoid overwhelming slow Tor connections. This method accumulates peers
        from all chunks until no more PEERLIST messages arrive within the
        inter-chunk timeout.

        Returns:
            List of (nick, location, features) tuples for active peers.
            Features will be empty for directories that don't support peerlist_features.
            Returns empty list if directory doesn't support GETPEERLIST or is rate-limited.
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        # Skip if we already know this directory doesn't support GETPEERLIST
        # (only applies to directories that didn't announce peerlist_features)
        if self._peerlist_supported is False and not self.directory_peerlist_features:
            logger.debug("Skipping GETPEERLIST - directory doesn't support it")
            return []

        # Rate-limit peerlist requests to avoid spamming
        import time

        current_time = time.time()
        if current_time - self._last_peerlist_request_time < self._peerlist_min_interval:
            logger.debug(
                f"Skipping GETPEERLIST - rate limited "
                f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)"
            )
            return []  # Return empty - will use offers for nick tracking

        self._last_peerlist_request_time = current_time

        getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""}
        logger.debug("Sending GETPEERLIST request")
        await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8"))

        start_time = asyncio.get_event_loop().time()

        # Timeout for waiting for the first PEERLIST response
        # Use longer timeout for directories that support peerlist_features
        first_response_timeout = (
            self._peerlist_timeout if self.directory_peerlist_features else self.timeout
        )

        # Timeout between chunks - when this expires after receiving at least one
        # PEERLIST message, we know the directory has finished sending all chunks
        inter_chunk_timeout = self._peerlist_chunk_timeout

        # Accumulate peers from multiple PEERLIST chunks
        all_peers: list[tuple[str, str, FeatureSet]] = []
        chunks_received = 0
        got_first_response = False

        while True:
            elapsed = asyncio.get_event_loop().time() - start_time

            # Determine timeout for this receive
            if not got_first_response:
                # Waiting for first PEERLIST - use full timeout
                remaining = first_response_timeout - elapsed
                if remaining <= 0:
                    self._handle_peerlist_timeout()
                    return []
                receive_timeout = remaining
            else:
                # Already received at least one chunk - use shorter inter-chunk timeout
                receive_timeout = inter_chunk_timeout

            try:
                response_data = await asyncio.wait_for(
                    self.connection.receive(), timeout=receive_timeout
                )
                response = json.loads(response_data.decode("utf-8"))
                msg_type = response.get("type")

                if msg_type == MessageType.PEERLIST.value:
                    got_first_response = True
                    chunks_received += 1
                    peerlist_str = response.get("line", "")
                    chunk_peers = self._handle_peerlist_response(peerlist_str)
                    all_peers.extend(chunk_peers)
                    logger.debug(
                        f"Received PEERLIST chunk {chunks_received} with "
                        f"{len(chunk_peers)} peers (total: {len(all_peers)})"
                    )
                    # Continue to check for more chunks
                    continue

                # Buffer unexpected messages (like PUBMSG offers) for later processing
                logger.trace(
                    f"Buffering unexpected message type {msg_type} while waiting for PEERLIST"
                )
                await self._message_buffer.put(response)

            except TimeoutError:
                if not got_first_response:
                    # Never received any PEERLIST - this is a real timeout
                    self._handle_peerlist_timeout()
                    return []
                # Received at least one chunk, inter-chunk timeout means we're done
                logger.debug(
                    f"Peerlist complete: received {len(all_peers)} peers "
                    f"in {chunks_received} chunks"
                )
                break

            except Exception as e:
                logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}")
                elapsed = asyncio.get_event_loop().time() - start_time
                if not got_first_response and elapsed > first_response_timeout:
                    self._handle_peerlist_timeout()
                    return []
                # If we already have some data, return what we have
                if got_first_response:
                    break

        # Success - reset timeout counter and mark as supported
        self._peerlist_timeout_count = 0
        self._peerlist_supported = True
        return all_peers

    def _handle_peerlist_timeout(self) -> None:
        """Handle timeout when waiting for PEERLIST response."""
        self._peerlist_timeout_count += 1

        if self.directory_peerlist_features:
            # Directory announced peerlist_features during handshake, so it supports
            # GETPEERLIST. Timeout is likely due to large peerlist or network issues.
            logger.warning(
                f"Timed out waiting for PEERLIST from {self.host}:{self.port} "
                f"(attempt {self._peerlist_timeout_count}) - "
                "peerlist may be large or network is slow"
            )
            # Don't disable peerlist requests - directory supports it, just slow
        else:
            # Directory didn't announce peerlist_features - likely reference impl
            logger.info(
                f"Timed out waiting for PEERLIST from {self.host}:{self.port} - "
                "directory likely doesn't support GETPEERLIST (reference implementation)"
            )
            self._peerlist_supported = False

    def _handle_peerlist_response(self, peerlist_str: str) -> list[tuple[str, str, FeatureSet]]:
        """
        Process a PEERLIST response and update internal state.

        Note: Some directories send multiple partial PEERLIST responses (one per peer)
        instead of a single complete list. We handle this by only adding/updating
        peers from each response, not removing nicks that aren't present.

        Removal of stale offers is handled by:
        1. Explicit disconnect markers (;D suffix) in peerlist entries
        2. The periodic peerlist refresh in OrderbookAggregator
        3. Staleness cleanup for directories without GETPEERLIST support

        Args:
            peerlist_str: Comma-separated list of peer entries

        Returns:
            List of active peers (nick, location, features) in this response
        """
        logger.debug(f"Peerlist string: {peerlist_str}")

        # Mark peerlist as supported since we got a valid response
        self._peerlist_supported = True

        if not peerlist_str:
            # Empty peerlist response - just return empty list
            # Don't remove offers as this might be a partial response
            return []

        peers: list[tuple[str, str, FeatureSet]] = []
        explicitly_disconnected: list[str] = []

        for entry in peerlist_str.split(","):
            # Skip empty entries
            if not entry or not entry.strip():
                continue
            # Skip entries without separator - these are metadata (e.g., 'peerlist_features')
            # from the reference implementation, not actual peer entries
            if NICK_PEERLOCATOR_SEPARATOR not in entry:
                logger.debug(f"Skipping metadata entry in peerlist: '{entry}'")
                continue
            try:
                nick, location, disconnected, features = parse_peerlist_entry(entry)
                logger.debug(
                    f"Parsed peer: {nick} at {location}, "
                    f"disconnected={disconnected}, features={features.to_comma_string()}"
                )
                if disconnected:
                    # Nick explicitly marked as disconnected - remove their offers
                    explicitly_disconnected.append(nick)
                else:
                    peers.append((nick, location, features))
                    # Update/add this nick to active peers
                    self._active_peers[nick] = location
                    # Merge features into peer_features cache (never overwrite/downgrade)
                    # This prevents losing features when receiving peerlist from directories
                    # that don't support peerlist_features
                    features_dict = features.to_dict()
                    self._merge_peer_features(nick, features_dict)

                    # Update features on any cached offers for this peer
                    # This fixes the race condition where offers are stored before
                    # peerlist response arrives with features
                    self._update_offer_features(nick, features_dict)
            except ValueError as e:
                logger.warning(f"Failed to parse peerlist entry '{entry}': {e}")
                continue

        # Only remove offers for nicks that are explicitly marked as disconnected
        for nick in explicitly_disconnected:
            self.remove_offers_for_nick(nick)

        logger.trace(
            f"Received {len(peers)} active peers with features from {self.host}:{self.port}"
            + (
                f", {len(explicitly_disconnected)} explicitly disconnected"
                if explicitly_disconnected
                else ""
            )
        )
        return peers

    async def listen_for_messages(self, duration: float = 5.0) -> list[dict[str, Any]]:
        """
        Listen for messages for a specified duration.

        This method collects all messages received within the specified duration.
        It properly handles connection closed errors by raising DirectoryClientError.

        Args:
            duration: How long to listen in seconds

        Returns:
            List of received messages

        Raises:
            DirectoryClientError: If not connected or connection is lost
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        # Check connection state before starting
        if not self.connection.is_connected():
            raise DirectoryClientError("Connection closed")

        messages: list[dict[str, Any]] = []
        start_time = asyncio.get_event_loop().time()

        # First, drain any buffered messages into our result list
        # These are messages that were received while waiting for other responses
        while not self._message_buffer.empty():
            try:
                buffered_msg = self._message_buffer.get_nowait()
                logger.trace(
                    f"Processing buffered message type {buffered_msg.get('type')}: "
                    f"{buffered_msg.get('line', '')[:80]}..."
                )
                messages.append(buffered_msg)
            except asyncio.QueueEmpty:
                break

        while asyncio.get_event_loop().time() - start_time < duration:
            try:
                remaining_time = duration - (asyncio.get_event_loop().time() - start_time)
                if remaining_time <= 0:
                    break

                response_data = await asyncio.wait_for(
                    self.connection.receive(), timeout=remaining_time
                )
                response = json.loads(response_data.decode("utf-8"))
                logger.trace(
                    f"Received message type {response.get('type')}: "
                    f"{response.get('line', '')[:80]}..."
                )
                messages.append(response)

            except TimeoutError:
                # Normal timeout - no more messages within duration
                break
            except Exception as e:
                # Connection errors should propagate up so caller can reconnect
                error_msg = str(e).lower()
                if "connection" in error_msg and ("closed" in error_msg or "lost" in error_msg):
                    raise DirectoryClientError(f"Connection lost: {e}") from e
                # Other errors (JSON parse, etc) - log and continue
                logger.warning(f"Error processing message: {e}")
                continue

        logger.trace(f"Collected {len(messages)} messages in {duration}s")
        return messages

    async def fetch_orderbooks(self) -> tuple[list[Offer], list[FidelityBond]]:
        """
        Fetch orderbooks from all connected peers.

        Trusts the directory's orderbook as authoritative - if a maker has an offer
        in the directory, they are considered online. The directory server maintains
        the connection state and removes offers when makers disconnect.

        Returns:
            Tuple of (offers, fidelity_bonds)
        """
        # Use get_peerlist_with_features to populate peer_features cache for neutrino_compat
        # detection. The peerlist itself is not used for offer filtering.
        peers_with_features = await self.get_peerlist_with_features()
        offers: list[Offer] = []
        bonds: list[FidelityBond] = []
        bond_utxo_set: set[str] = set()

        # Log peer count for visibility (but don't filter based on peerlist)
        if peers_with_features:
            logger.info(f"Found {len(peers_with_features)} peers on {self.host}:{self.port}")

        if not self.connection:
            raise DirectoryClientError("Not connected")

        pubmsg = {
            "type": MessageType.PUBMSG.value,
            "line": f"{self.nick}!PUBLIC!!orderbook",
        }
        await self.connection.send(json.dumps(pubmsg).encode("utf-8"))
        logger.debug("Sent !orderbook broadcast to PUBLIC")

        # Based on empirical testing with the main JoinMarket directory server over Tor,
        # the response time distribution shows significant delays:
        # - Median: ~38s (50% of offers)
        # - 75th percentile: ~65s
        # - 90th percentile: ~93s
        # - 95th percentile: ~101s
        # - 99th percentile: ~115s
        # - Max observed: ~119s
        # Using 120s (95th percentile + 20% buffer) ensures we capture ~95% of all offers.
        # The wide distribution is due to Tor latency and maker response times.
        #
        # For testing, this can be overridden via JM_ORDERBOOK_WAIT_TIME environment variable.
        listen_duration = float(os.environ.get("JM_ORDERBOOK_WAIT_TIME", "120.0"))
        logger.info(f"Listening for offer announcements for {listen_duration} seconds...")
        messages = await self.listen_for_messages(duration=listen_duration)

        logger.info(f"Received {len(messages)} messages, parsing offers...")

        for response in messages:
            try:
                msg_type = response.get("type")
                line = response["line"]

                # Handle PEERLIST messages to keep peer features and active peers updated
                if msg_type == MessageType.PEERLIST.value:
                    try:
                        self._handle_peerlist_response(line)
                        logger.debug("Processed PEERLIST during orderbook fetch")
                    except Exception as e:
                        logger.debug(f"Failed to process PEERLIST: {e}")
                    continue

                if msg_type not in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value):
                    logger.debug(f"Skipping message type {msg_type}")
                    continue

                logger.debug(f"Processing message type {msg_type}: {line[:100]}...")

                parts = line.split(COMMAND_PREFIX)
                if len(parts) < 3:
                    logger.debug(f"Message has insufficient parts: {len(parts)}")
                    continue

                from_nick = parts[0]
                to_nick = parts[1]
                rest = COMMAND_PREFIX.join(parts[2:])

                if not rest.strip():
                    logger.debug("Empty message content")
                    continue

                offer_types = ["sw0absoffer", "sw0reloffer", "swabsoffer", "swreloffer"]
                parsed = False
                for offer_type in offer_types:
                    if rest.startswith(offer_type):
                        try:
                            # Split on '!' to extract flags (neutrino, tbond)
                            # Format: sw0reloffer 0 750000 790107726787 500 0.001!neutrino!tbond <proof>
                            # NOTE: !neutrino in offers is deprecated - primary detection is via
                            # handshake features. This parsing is kept for backwards compatibility.
                            rest_parts = rest.split(COMMAND_PREFIX)
                            offer_line = rest_parts[0]
                            bond_data = None
                            neutrino_compat = False

                            # Parse flags after the offer line (backwards compat for !neutrino)
                            for flag_part in rest_parts[1:]:
                                if flag_part.startswith("neutrino"):
                                    neutrino_compat = True
                                    logger.debug(f"Maker {from_nick} requires neutrino_compat")
                                elif flag_part.startswith("tbond "):
                                    bond_parts = flag_part[6:].split()
                                    if bond_parts:
                                        bond_proof_b64 = bond_parts[0]
                                        # For PRIVMSG, the maker signs with taker's actual nick
                                        # For PUBMSG, both nicks are the maker's (self-signed)
                                        is_privmsg = msg_type == MessageType.PRIVMSG.value
                                        taker_nick_for_proof = to_nick if is_privmsg else from_nick
                                        bond_data = parse_fidelity_bond_proof(
                                            bond_proof_b64, from_nick, taker_nick_for_proof
                                        )
                                        if bond_data:
                                            logger.debug(
                                                f"Parsed fidelity bond from {from_nick}: "
                                                f"txid={bond_data['utxo_txid'][:16]}..., "
                                                f"locktime={bond_data['locktime']}"
                                            )

                                            utxo_str = (
                                                f"{bond_data['utxo_txid']}:{bond_data['utxo_vout']}"
                                            )
                                            if utxo_str not in bond_utxo_set:
                                                bond_utxo_set.add(utxo_str)
                                                bond = FidelityBond(
                                                    counterparty=from_nick,
                                                    utxo_txid=bond_data["utxo_txid"],
                                                    utxo_vout=bond_data["utxo_vout"],
                                                    locktime=bond_data["locktime"],
                                                    script=bond_data["utxo_pub"],
                                                    utxo_confirmations=0,
                                                    cert_expiry=bond_data["cert_expiry"],
                                                    fidelity_bond_data=bond_data,
                                                )
                                                bonds.append(bond)

                            offer_parts = offer_line.split()
                            if len(offer_parts) < 6:
                                logger.warning(
                                    f"Offer from {from_nick} has {len(offer_parts)} parts, need 6"
                                )
                                continue

                            oid = int(offer_parts[1])
                            minsize = int(offer_parts[2])
                            maxsize = int(offer_parts[3])
                            txfee = int(offer_parts[4])
                            cjfee_str = offer_parts[5]

                            if offer_type in ["sw0absoffer", "swabsoffer"]:
                                cjfee = str(int(cjfee_str))
                            else:
                                cjfee = str(Decimal(cjfee_str))

                            offer = Offer(
                                counterparty=from_nick,
                                oid=oid,
                                ordertype=OfferType(offer_type),
                                minsize=minsize,
                                maxsize=maxsize,
                                txfee=txfee,
                                cjfee=cjfee,
                                fidelity_bond_value=0,
                                neutrino_compat=neutrino_compat,
                                features=self.peer_features.get(from_nick, {}),
                            )
                            offers.append(offer)

                            if bond_data:
                                offer.fidelity_bond_data = bond_data

                            logger.debug(
                                f"Parsed {offer_type} from {from_nick}: "
                                f"oid={oid}, size={minsize}-{maxsize}, fee={cjfee}, "
                                f"has_bond={bond_data is not None}, neutrino_compat={neutrino_compat}"
                            )
                            parsed = True
                        except Exception as e:
                            logger.warning(f"Failed to parse {offer_type} from {from_nick}: {e}")
                        break

                if not parsed:
                    logger.debug(f"Message not an offer: {rest[:50]}...")

            except Exception as e:
                logger.warning(f"Failed to process message: {e}")
                continue

        # NOTE: We trust the directory's orderbook as authoritative.
        # If a maker has an offer in the directory, they are considered online.
        # The directory server maintains the connection state and removes offers
        # when makers disconnect. Peerlist responses may be delayed or unavailable,
        # so we don't filter offers based on peerlist presence.
        #
        # This prevents incorrectly rejecting valid offers from active makers
        # whose peerlist entry hasn't been received yet.

        logger.info(
            f"Fetched {len(offers)} offers and {len(bonds)} fidelity bonds from "
            f"{self.host}:{self.port}"
        )
        return offers, bonds

    async def send_public_message(self, message: str) -> None:
        """
        Send a public message to all peers.

        Args:
            message: Message to broadcast
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        pubmsg = {
            "type": MessageType.PUBMSG.value,
            "line": f"{self.nick}!PUBLIC!{message}",
        }
        await self.connection.send(json.dumps(pubmsg).encode("utf-8"))

    async def send_private_message(self, recipient: str, command: str, data: str) -> None:
        """
        Send a signed private message to a specific peer.

        JoinMarket requires all private messages to be signed with the sender's
        nick private key. The signature is appended to the message:
        Format: "!<command> <data> <pubkey_hex> <signature>"

        The message-to-sign is: data + hostid (to prevent replay attacks)
        Note: Only the data is signed, NOT the command prefix.

        Args:
            recipient: Target peer nick
            command: Command name (without ! prefix, e.g., 'fill', 'auth', 'tx')
            data: Command arguments to send (will be signed)
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        # Sign just the data (not the command) with our nick identity
        # Reference: rawmessage = ' '.join(message[1:].split(' ')[1:-2])
        # This means they extract [1:-2] which is the args, not the command
        # So we sign: data + hostid
        # IMPORTANT: Always use ONION_HOSTID ("onion-network"), NOT the directory hostname.
        # The reference implementation uses a fixed hostid for ALL onion message channels
        # (see jmdaemon/onionmc.py line 635: self.hostid = "onion-network")
        signed_data = self.nick_identity.sign_message(data, ONION_HOSTID)

        # JoinMarket message format: from_nick!to_nick!command <args>
        # The COMMAND_PREFIX ("!") is used ONLY as a field separator between
        # from_nick, to_nick, and the message content. The command itself
        # does NOT have a "!" prefix.
        # Format: "<command> <signed_data>" where signed_data = "<data> <pubkey_hex> <sig_b64>"
        full_message = f"{command} {signed_data}"

        privmsg = {
            "type": MessageType.PRIVMSG.value,
            "line": f"{self.nick}!{recipient}!{full_message}",
        }
        await self.connection.send(json.dumps(privmsg).encode("utf-8"))

    async def close(self) -> None:
        """Close the connection to the directory server."""
        if self.connection:
            try:
                # NOTE: We skip sending DISCONNECT (801) because the reference implementation
                # crashes on unhandled control messages.
                pass
            except Exception:
                pass
            finally:
                await self.connection.close()
                self.connection = None

    def stop(self) -> None:
        """Stop continuous listening."""
        self.running = False

    async def listen_continuously(self, request_orderbook: bool = True) -> None:
        """
        Continuously listen for messages and update internal offer/bond caches.

        This method runs indefinitely until stop() is called or connection is lost.
        Used by orderbook_watcher and maker to maintain live orderbook state.

        Args:
            request_orderbook: If True, send !orderbook request on startup to get
                current offers from makers. Set to False for maker bots that don't
                need to receive other offers.
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        logger.info(f"Starting continuous listening on {self.host}:{self.port}")
        self.running = True

        # Fetch peerlist with features to populate peer_features cache
        # This allows us to know which features each maker supports
        # Note: This may return empty if directory doesn't support GETPEERLIST (reference impl)
        try:
            await self.get_peerlist_with_features()
            if self._peerlist_supported:
                logger.info(f"Populated peer_features cache with {len(self.peer_features)} peers")
            else:
                logger.info(
                    "Directory doesn't support GETPEERLIST - peer features will be "
                    "learned from offer messages"
                )
        except Exception as e:
            logger.warning(f"Failed to fetch peerlist with features: {e}")

        # Request current orderbook from makers
        if request_orderbook:
            try:
                pubmsg = {
                    "type": MessageType.PUBMSG.value,
                    "line": f"{self.nick}!PUBLIC!!orderbook",
                }
                await self.connection.send(json.dumps(pubmsg).encode("utf-8"))
                logger.info("Sent !orderbook request to get current offers")
            except Exception as e:
                logger.warning(f"Failed to send !orderbook request: {e}")

        # Track when we last sent an orderbook request (to avoid spamming)
        import time

        last_orderbook_request = time.time()
        orderbook_request_min_interval = 60.0  # Minimum 60 seconds between requests

        while self.running:
            try:
                # First check if we have buffered messages from previous operations
                # (e.g., messages received while waiting for PEERLIST)
                if not self._message_buffer.empty():
                    message = await self._message_buffer.get()
                    logger.trace("Processing buffered message from queue")
                else:
                    # Read next message with timeout
                    data = await asyncio.wait_for(self.connection.receive(), timeout=5.0)

                    if not data:
                        logger.warning(f"Connection to {self.host}:{self.port} closed")
                        break

                    message = json.loads(data.decode("utf-8"))
                msg_type = message.get("type")
                line = message.get("line", "")

                # Handle PEERLIST responses (from periodic or automatic requests)
                if msg_type == MessageType.PEERLIST.value:
                    try:
                        self._handle_peerlist_response(line)
                    except Exception as e:
                        logger.debug(f"Failed to process PEERLIST: {e}")
                    continue

                # Process PUBMSG and PRIVMSG to update offers/bonds cache
                # Reference implementation sends offer responses to !orderbook via PRIVMSG
                if msg_type in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value):
                    try:
                        parts = line.split(COMMAND_PREFIX)
                        if len(parts) >= 3:
                            from_nick = parts[0]
                            to_nick = parts[1]
                            rest = COMMAND_PREFIX.join(parts[2:])

                            # Accept PUBLIC broadcasts or messages addressed to us
                            if to_nick == "PUBLIC" or to_nick == self.nick:
                                # If we don't have features for this peer, it's a new peer.
                                # Track them with empty features for now - we'll get their features
                                # from the initial peerlist or from their offer messages
                                is_new_peer = from_nick not in self.peer_features
                                current_time = time.time()

                                if is_new_peer:
                                    # Track new peer - merge empty features (will be a no-op
                                    # if we already know their features from another source)
                                    # Features will be populated from offer messages or peerlist
                                    self._merge_peer_features(from_nick, {})
                                    logger.debug(f"Discovered new peer: {from_nick}")

                                    # If directory supports peerlist_features, request updated peerlist
                                    # to get this peer's features immediately
                                    if (
                                        self.directory_peerlist_features
                                        and self._peerlist_supported
                                    ):
                                        try:
                                            # Request peerlist to get features for new peer
                                            # This is a background task - don't block message processing
                                            asyncio.create_task(
                                                self._refresh_peerlist_for_new_peer()
                                            )
                                        except Exception as e:
                                            logger.debug(
                                                f"Failed to request peerlist for new peer: {e}"
                                            )

                                    # Request orderbook from new peer (rate-limited)
                                    if (
                                        request_orderbook
                                        and current_time - last_orderbook_request
                                        > orderbook_request_min_interval
                                    ):
                                        try:
                                            pubmsg = {
                                                "type": MessageType.PUBMSG.value,
                                                "line": f"{self.nick}!PUBLIC!!orderbook",
                                            }
                                            await self.connection.send(
                                                json.dumps(pubmsg).encode("utf-8")
                                            )
                                            last_orderbook_request = current_time
                                            logger.info(
                                                f"Sent !orderbook request for new peer {from_nick}"
                                            )
                                        except Exception as e:
                                            logger.debug(f"Failed to send !orderbook: {e}")

                                # Parse offer announcements
                                for offer_type_prefix in [
                                    "sw0reloffer",
                                    "sw0absoffer",
                                    "swreloffer",
                                    "swabsoffer",
                                ]:
                                    if rest.startswith(offer_type_prefix):
                                        # Separate offer from fidelity bond data
                                        rest_parts = rest.split(COMMAND_PREFIX, 1)
                                        offer_line = rest_parts[0].strip()

                                        # Parse fidelity bond if present
                                        bond_data = None
                                        if len(rest_parts) > 1 and rest_parts[1].startswith(
                                            "tbond "
                                        ):
                                            bond_parts = rest_parts[1][6:].split()
                                            if bond_parts:
                                                bond_proof_b64 = bond_parts[0]
                                                # For PUBLIC announcements, maker uses their own nick
                                                # as taker_nick when creating the proof.
                                                # For PRIVMSG (response to !orderbook), maker signs
                                                # for the recipient (us).
                                                taker_nick_for_proof = (
                                                    from_nick if to_nick == "PUBLIC" else to_nick
                                                )
                                                bond_data = parse_fidelity_bond_proof(
                                                    bond_proof_b64, from_nick, taker_nick_for_proof
                                                )
                                                if bond_data:
                                                    logger.debug(
                                                        f"Parsed fidelity bond from {from_nick}: "
                                                        f"txid={bond_data['utxo_txid'][:16]}..., "
                                                        f"locktime={bond_data['locktime']}"
                                                    )
                                                    # Store bond in bonds cache
                                                    utxo_str = (
                                                        f"{bond_data['utxo_txid']}:"
                                                        f"{bond_data['utxo_vout']}"
                                                    )
                                                    bond = FidelityBond(
                                                        counterparty=from_nick,
                                                        utxo_txid=bond_data["utxo_txid"],
                                                        utxo_vout=bond_data["utxo_vout"],
                                                        locktime=bond_data["locktime"],
                                                        script=bond_data["utxo_pub"],
                                                        utxo_confirmations=0,
                                                        cert_expiry=bond_data["cert_expiry"],
                                                        fidelity_bond_data=bond_data,
                                                    )
                                                    self.bonds[utxo_str] = bond

                                        offer_parts = offer_line.split()
                                        if len(offer_parts) >= 6:
                                            try:
                                                oid = int(offer_parts[1])
                                                minsize = int(offer_parts[2])
                                                maxsize = int(offer_parts[3])
                                                txfee = int(offer_parts[4])
                                                cjfee_str = offer_parts[5]

                                                if offer_type_prefix in [
                                                    "sw0absoffer",
                                                    "swabsoffer",
                                                ]:
                                                    cjfee = str(int(cjfee_str))
                                                else:
                                                    cjfee = str(Decimal(cjfee_str))

                                                offer = Offer(
                                                    counterparty=from_nick,
                                                    oid=oid,
                                                    ordertype=OfferType(offer_type_prefix),
                                                    minsize=minsize,
                                                    maxsize=maxsize,
                                                    txfee=txfee,
                                                    cjfee=cjfee,
                                                    fidelity_bond_value=0,
                                                    fidelity_bond_data=bond_data,
                                                    features=self.peer_features.get(from_nick, {}),
                                                )

                                                # Extract bond UTXO key for deduplication
                                                bond_utxo_key: str | None = None
                                                if bond_data:
                                                    bond_utxo_key = (
                                                        f"{bond_data['utxo_txid']}:"
                                                        f"{bond_data['utxo_vout']}"
                                                    )

                                                # Update cache using tuple key
                                                offer_key = (from_nick, oid)
                                                self._store_offer(offer_key, offer, bond_utxo_key)

                                                # Track this peer as "known" even if peerlist didn't
                                                # return features. This prevents re-triggering new peer
                                                # logic for every message from this peer.
                                                if from_nick not in self.peer_features:
                                                    self.peer_features[from_nick] = {}

                                                logger.debug(
                                                    f"Updated offer cache: {from_nick} "
                                                    f"{offer_type_prefix} oid={oid}"
                                                    + (" (with bond)" if bond_data else "")
                                                )
                                            except Exception as e:
                                                logger.debug(f"Failed to parse offer update: {e}")
                                        break
                    except Exception as e:
                        logger.debug(f"Failed to process PUBMSG: {e}")

            except TimeoutError:
                continue
            except asyncio.CancelledError:
                logger.info(f"Continuous listening on {self.host}:{self.port} cancelled")
                break
            except Exception as e:
                logger.error(f"Error in continuous listening: {e}")
                if self.on_disconnect:
                    self.on_disconnect()
                break

        self.running = False
        logger.info(f"Stopped continuous listening on {self.host}:{self.port}")

    def _store_offer(
        self,
        offer_key: tuple[str, int],
        offer: Offer,
        bond_utxo_key: str | None = None,
    ) -> None:
        """
        Store an offer with timestamp and handle bond-based deduplication.

        When a maker restarts with a new nick but the same fidelity bond, we need to
        remove the old offer(s) associated with that bond to prevent duplicates.

        Args:
            offer_key: Tuple of (counterparty, oid)
            offer: The offer to store
            bond_utxo_key: Bond UTXO key (txid:vout) if offer has a fidelity bond
        """
        current_time = time.time()

        # If this offer has a fidelity bond, check for and remove old offers with same bond
        if bond_utxo_key:
            # Get all offer keys that previously used this bond
            old_offer_keys = self._bond_to_offers.get(bond_utxo_key, set()).copy()

            # Remove old offers from DIFFERENT makers using same bond (maker restart scenario)
            # Keep multiple offers from SAME maker (same counterparty, different oids)
            for old_key in old_offer_keys:
                if (
                    old_key != offer_key
                    and old_key in self.offers
                    and old_key[0] != offer_key[0]  # Different counterparty
                ):
                    logger.debug(
                        f"Removing stale offer from {old_key[0]} oid={old_key[1]} - "
                        f"same bond UTXO now used by {offer_key[0]}"
                    )
                    del self.offers[old_key]

            # Update bond -> offers mapping: add this offer to the set
            if bond_utxo_key not in self._bond_to_offers:
                self._bond_to_offers[bond_utxo_key] = set()
            self._bond_to_offers[bond_utxo_key].add(offer_key)
        else:
            # Remove this offer from any previous bond mapping
            old_offer_data = self.offers.get(offer_key)
            if old_offer_data and old_offer_data.bond_utxo_key:
                old_bond_key = old_offer_data.bond_utxo_key
                if old_bond_key in self._bond_to_offers:
                    self._bond_to_offers[old_bond_key].discard(offer_key)

        # Store the new offer with timestamp
        self.offers[offer_key] = OfferWithTimestamp(
            offer=offer, received_at=current_time, bond_utxo_key=bond_utxo_key
        )

    def _update_offer_features(self, nick: str, features: dict[str, bool]) -> int:
        """
        Update features on all cached offers for a specific peer.

        This is called when we receive updated feature information from peerlist,
        ensuring that offers stored before features were known get updated.

        Args:
            nick: The nick to update features for
            features: New features dict to apply

        Returns:
            Number of offers updated
        """
        updated = 0
        for key, offer_ts in self.offers.items():
            if key[0] == nick:
                # Update features on the cached offer
                # Merge new features with any existing ones (new features take precedence)
                for feature, value in features.items():
                    if value:  # Only set true features
                        offer_ts.offer.features[feature] = value
                updated += 1

        if updated > 0:
            logger.debug(
                f"Updated features on {updated} cached offer(s) for {nick}: "
                f"{[k for k, v in features.items() if v]}"
            )

        return updated

    def _merge_peer_features(self, nick: str, new_features: dict[str, bool]) -> None:
        """
        Merge new features into the peer_features cache for a nick.

        Features are cumulative - once a peer advertises a feature, we keep it.
        This prevents losing features when receiving updates from directories
        that don't support peerlist_features.

        Args:
            nick: The peer's nick
            new_features: New features dict to merge (only True values are added)
        """
        existing = self.peer_features.get(nick, {})
        for feature, value in new_features.items():
            if value:  # Only set true features, never downgrade
                existing[feature] = value
        self.peer_features[nick] = existing

    def remove_offers_for_nick(self, nick: str) -> int:
        """
        Remove all offers from a specific nick (e.g., when nick goes offline).

        This is the equivalent of the reference implementation's on_nick_leave callback.

        Args:
            nick: The nick to remove offers for

        Returns:
            Number of offers removed
        """
        keys_to_remove = [key for key in self.offers if key[0] == nick]
        removed = 0

        for key in keys_to_remove:
            offer_data = self.offers.pop(key, None)
            if offer_data:
                removed += 1
                # Clean up bond mapping
                if offer_data.bond_utxo_key and offer_data.bond_utxo_key in self._bond_to_offers:
                    self._bond_to_offers[offer_data.bond_utxo_key].discard(key)

        if removed > 0:
            logger.info(f"Removed {removed} offers for nick {nick} (left/offline)")

        # Also remove from peer_features and active_peers
        self.peer_features.pop(nick, None)
        self._active_peers.pop(nick, None)

        # Remove any bonds from this nick
        bonds_to_remove = [k for k, v in self.bonds.items() if v.counterparty == nick]
        for bond_key in bonds_to_remove:
            del self.bonds[bond_key]

        return removed

    async def _refresh_peerlist_for_new_peer(self) -> None:
        """
        Refresh peerlist to get features for newly discovered peers.

        This is called as a background task when a new peer is discovered
        to immediately fetch their features from the directory's peerlist.
        """
        try:
            # Small delay to batch multiple new peer discoveries
            await asyncio.sleep(2.0)

            # Request peerlist - this will update peer_features
            peers = await self.get_peerlist_with_features()
            if peers:
                logger.debug(
                    f"Refreshed peerlist for new peer discovery: {len(peers)} active peers"
                )
        except Exception as e:
            logger.debug(f"Failed to refresh peerlist for new peer: {e}")

    def get_active_nicks(self) -> set[str]:
        """Get set of nicks from the last peerlist update."""
        return set(self._active_peers.keys())

    def cleanup_stale_offers(self, max_age_seconds: float = 1800.0) -> int:
        """
        Remove offers that haven't been re-announced within the staleness threshold.

        This is a fallback cleanup mechanism for directories that don't support
        GETPEERLIST (reference implementation). For offers with fidelity bonds,
        bond-based deduplication handles most cases, but this catches offers
        from makers that silently went offline.

        Args:
            max_age_seconds: Maximum age in seconds before an offer is considered stale.
                Default is 30 minutes (1800 seconds).

        Returns:
            Number of stale offers removed
        """
        current_time = time.time()
        stale_keys: list[tuple[str, int]] = []

        for key, offer_data in self.offers.items():
            age = current_time - offer_data.received_at
            if age > max_age_seconds:
                stale_keys.append(key)

        removed = 0
        for key in stale_keys:
            removed_offer: OfferWithTimestamp | None = self.offers.pop(key, None)
            if removed_offer:
                removed += 1
                # Clean up bond mapping
                if (
                    removed_offer.bond_utxo_key
                    and removed_offer.bond_utxo_key in self._bond_to_offers
                ):
                    self._bond_to_offers[removed_offer.bond_utxo_key].discard(key)
                logger.debug(
                    f"Removed stale offer from {key[0]} oid={key[1]} "
                    f"(age={current_time - removed_offer.received_at:.0f}s)"
                )

        if removed > 0:
            logger.info(f"Cleaned up {removed} stale offers (older than {max_age_seconds}s)")

        return removed

    def get_current_offers(self) -> list[Offer]:
        """Get the current list of cached offers."""
        return [offer_data.offer for offer_data in self.offers.values()]

    def get_offers_with_timestamps(self) -> list[OfferWithTimestamp]:
        """Get offers with their timestamp metadata."""
        return list(self.offers.values())

    def get_current_bonds(self) -> list[FidelityBond]:
        """Get the current list of cached fidelity bonds."""
        return list(self.bonds.values())

    def supports_extended_utxo_format(self) -> bool:
        """
        Check if we should use extended UTXO format with this directory.

        Extended format (txid:vout:scriptpubkey:blockheight) is used when
        both sides advertise neutrino_compat feature. Protocol version
        is not checked - features are negotiated independently.

        Returns:
            True if extended UTXO format should be used
        """
        return self.neutrino_compat and self.directory_neutrino_compat

    def get_negotiated_version(self) -> int:
        """
        Get the negotiated protocol version.

        Returns:
            Negotiated version (always 5 with feature-based approach)
        """
        return self.negotiated_version if self.negotiated_version is not None else JM_VERSION

Client for connecting to JoinMarket directory servers.

Supports: - Direct TCP connections (for local/dev) - Tor connections (for .onion addresses) - Handshake protocol - Peerlist fetching - Orderbook fetching - Continuous listening for updates

Initialize DirectoryClient.

Args

host
Directory server hostname or .onion address
port
Directory server port
network
Bitcoin network (mainnet, testnet, signet, regtest)
nick_identity
NickIdentity for message signing (generated if None)
location
Our location string (onion address or NOT-SERVING-ONION)
socks_host
SOCKS proxy host for Tor
socks_port
SOCKS proxy port for Tor
timeout
Connection timeout in seconds
max_message_size
Maximum message size in bytes
on_disconnect
Callback when connection drops
neutrino_compat
Advertise support for Neutrino-compatible UTXO metadata
peerlist_timeout
Timeout for first PEERLIST chunk (default 60s, subsequent chunks use 5s)

Methods

def cleanup_stale_offers(self, max_age_seconds: float = 1800.0) ‑> int
Expand source code
def cleanup_stale_offers(self, max_age_seconds: float = 1800.0) -> int:
    """
    Remove offers that haven't been re-announced within the staleness threshold.

    This is a fallback cleanup mechanism for directories that don't support
    GETPEERLIST (reference implementation). For offers with fidelity bonds,
    bond-based deduplication handles most cases, but this catches offers
    from makers that silently went offline.

    Args:
        max_age_seconds: Maximum age in seconds before an offer is considered stale.
            Default is 30 minutes (1800 seconds).

    Returns:
        Number of stale offers removed
    """
    current_time = time.time()
    stale_keys: list[tuple[str, int]] = []

    for key, offer_data in self.offers.items():
        age = current_time - offer_data.received_at
        if age > max_age_seconds:
            stale_keys.append(key)

    removed = 0
    for key in stale_keys:
        removed_offer: OfferWithTimestamp | None = self.offers.pop(key, None)
        if removed_offer:
            removed += 1
            # Clean up bond mapping
            if (
                removed_offer.bond_utxo_key
                and removed_offer.bond_utxo_key in self._bond_to_offers
            ):
                self._bond_to_offers[removed_offer.bond_utxo_key].discard(key)
            logger.debug(
                f"Removed stale offer from {key[0]} oid={key[1]} "
                f"(age={current_time - removed_offer.received_at:.0f}s)"
            )

    if removed > 0:
        logger.info(f"Cleaned up {removed} stale offers (older than {max_age_seconds}s)")

    return removed

Remove offers that haven't been re-announced within the staleness threshold.

This is a fallback cleanup mechanism for directories that don't support GETPEERLIST (reference implementation). For offers with fidelity bonds, bond-based deduplication handles most cases, but this catches offers from makers that silently went offline.

Args

max_age_seconds
Maximum age in seconds before an offer is considered stale. Default is 30 minutes (1800 seconds).

Returns

Number of stale offers removed

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close the connection to the directory server."""
    if self.connection:
        try:
            # NOTE: We skip sending DISCONNECT (801) because the reference implementation
            # crashes on unhandled control messages.
            pass
        except Exception:
            pass
        finally:
            await self.connection.close()
            self.connection = None

Close the connection to the directory server.

async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    """Connect to the directory server and perform handshake."""
    try:
        logger.debug(f"DirectoryClient.connect: connecting to {self.host}:{self.port}")
        if not self.host.endswith(".onion"):
            self.connection = await connect_direct(
                self.host,
                self.port,
                self.max_message_size,
                self.timeout,
            )
            logger.debug("DirectoryClient.connect: direct connection established")
        else:
            self.connection = await connect_via_tor(
                self.host,
                self.port,
                self.socks_host,
                self.socks_port,
                self.max_message_size,
                self.timeout,
            )
            logger.debug("DirectoryClient.connect: tor connection established")
        logger.debug("DirectoryClient.connect: starting handshake")
        await self._handshake()
        logger.debug("DirectoryClient.connect: handshake complete")
    except Exception as e:
        logger.error(f"Failed to connect to {self.host}:{self.port}: {e}", exc_info=True)
        # Clean up connection if handshake failed
        if self.connection:
            with contextlib.suppress(Exception):
                await self.connection.close()
            self.connection = None
        raise DirectoryClientError(f"Connection failed: {e}") from e

Connect to the directory server and perform handshake.

async def fetch_orderbooks(self) ‑> tuple[list[Offer], list[FidelityBond]]
Expand source code
async def fetch_orderbooks(self) -> tuple[list[Offer], list[FidelityBond]]:
    """
    Fetch orderbooks from all connected peers.

    Trusts the directory's orderbook as authoritative - if a maker has an offer
    in the directory, they are considered online. The directory server maintains
    the connection state and removes offers when makers disconnect.

    Returns:
        Tuple of (offers, fidelity_bonds)
    """
    # Use get_peerlist_with_features to populate peer_features cache for neutrino_compat
    # detection. The peerlist itself is not used for offer filtering.
    peers_with_features = await self.get_peerlist_with_features()
    offers: list[Offer] = []
    bonds: list[FidelityBond] = []
    bond_utxo_set: set[str] = set()

    # Log peer count for visibility (but don't filter based on peerlist)
    if peers_with_features:
        logger.info(f"Found {len(peers_with_features)} peers on {self.host}:{self.port}")

    if not self.connection:
        raise DirectoryClientError("Not connected")

    pubmsg = {
        "type": MessageType.PUBMSG.value,
        "line": f"{self.nick}!PUBLIC!!orderbook",
    }
    await self.connection.send(json.dumps(pubmsg).encode("utf-8"))
    logger.debug("Sent !orderbook broadcast to PUBLIC")

    # Based on empirical testing with the main JoinMarket directory server over Tor,
    # the response time distribution shows significant delays:
    # - Median: ~38s (50% of offers)
    # - 75th percentile: ~65s
    # - 90th percentile: ~93s
    # - 95th percentile: ~101s
    # - 99th percentile: ~115s
    # - Max observed: ~119s
    # Using 120s (95th percentile + 20% buffer) ensures we capture ~95% of all offers.
    # The wide distribution is due to Tor latency and maker response times.
    #
    # For testing, this can be overridden via JM_ORDERBOOK_WAIT_TIME environment variable.
    listen_duration = float(os.environ.get("JM_ORDERBOOK_WAIT_TIME", "120.0"))
    logger.info(f"Listening for offer announcements for {listen_duration} seconds...")
    messages = await self.listen_for_messages(duration=listen_duration)

    logger.info(f"Received {len(messages)} messages, parsing offers...")

    for response in messages:
        try:
            msg_type = response.get("type")
            line = response["line"]

            # Handle PEERLIST messages to keep peer features and active peers updated
            if msg_type == MessageType.PEERLIST.value:
                try:
                    self._handle_peerlist_response(line)
                    logger.debug("Processed PEERLIST during orderbook fetch")
                except Exception as e:
                    logger.debug(f"Failed to process PEERLIST: {e}")
                continue

            if msg_type not in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value):
                logger.debug(f"Skipping message type {msg_type}")
                continue

            logger.debug(f"Processing message type {msg_type}: {line[:100]}...")

            parts = line.split(COMMAND_PREFIX)
            if len(parts) < 3:
                logger.debug(f"Message has insufficient parts: {len(parts)}")
                continue

            from_nick = parts[0]
            to_nick = parts[1]
            rest = COMMAND_PREFIX.join(parts[2:])

            if not rest.strip():
                logger.debug("Empty message content")
                continue

            offer_types = ["sw0absoffer", "sw0reloffer", "swabsoffer", "swreloffer"]
            parsed = False
            for offer_type in offer_types:
                if rest.startswith(offer_type):
                    try:
                        # Split on '!' to extract flags (neutrino, tbond)
                        # Format: sw0reloffer 0 750000 790107726787 500 0.001!neutrino!tbond <proof>
                        # NOTE: !neutrino in offers is deprecated - primary detection is via
                        # handshake features. This parsing is kept for backwards compatibility.
                        rest_parts = rest.split(COMMAND_PREFIX)
                        offer_line = rest_parts[0]
                        bond_data = None
                        neutrino_compat = False

                        # Parse flags after the offer line (backwards compat for !neutrino)
                        for flag_part in rest_parts[1:]:
                            if flag_part.startswith("neutrino"):
                                neutrino_compat = True
                                logger.debug(f"Maker {from_nick} requires neutrino_compat")
                            elif flag_part.startswith("tbond "):
                                bond_parts = flag_part[6:].split()
                                if bond_parts:
                                    bond_proof_b64 = bond_parts[0]
                                    # For PRIVMSG, the maker signs with taker's actual nick
                                    # For PUBMSG, both nicks are the maker's (self-signed)
                                    is_privmsg = msg_type == MessageType.PRIVMSG.value
                                    taker_nick_for_proof = to_nick if is_privmsg else from_nick
                                    bond_data = parse_fidelity_bond_proof(
                                        bond_proof_b64, from_nick, taker_nick_for_proof
                                    )
                                    if bond_data:
                                        logger.debug(
                                            f"Parsed fidelity bond from {from_nick}: "
                                            f"txid={bond_data['utxo_txid'][:16]}..., "
                                            f"locktime={bond_data['locktime']}"
                                        )

                                        utxo_str = (
                                            f"{bond_data['utxo_txid']}:{bond_data['utxo_vout']}"
                                        )
                                        if utxo_str not in bond_utxo_set:
                                            bond_utxo_set.add(utxo_str)
                                            bond = FidelityBond(
                                                counterparty=from_nick,
                                                utxo_txid=bond_data["utxo_txid"],
                                                utxo_vout=bond_data["utxo_vout"],
                                                locktime=bond_data["locktime"],
                                                script=bond_data["utxo_pub"],
                                                utxo_confirmations=0,
                                                cert_expiry=bond_data["cert_expiry"],
                                                fidelity_bond_data=bond_data,
                                            )
                                            bonds.append(bond)

                        offer_parts = offer_line.split()
                        if len(offer_parts) < 6:
                            logger.warning(
                                f"Offer from {from_nick} has {len(offer_parts)} parts, need 6"
                            )
                            continue

                        oid = int(offer_parts[1])
                        minsize = int(offer_parts[2])
                        maxsize = int(offer_parts[3])
                        txfee = int(offer_parts[4])
                        cjfee_str = offer_parts[5]

                        if offer_type in ["sw0absoffer", "swabsoffer"]:
                            cjfee = str(int(cjfee_str))
                        else:
                            cjfee = str(Decimal(cjfee_str))

                        offer = Offer(
                            counterparty=from_nick,
                            oid=oid,
                            ordertype=OfferType(offer_type),
                            minsize=minsize,
                            maxsize=maxsize,
                            txfee=txfee,
                            cjfee=cjfee,
                            fidelity_bond_value=0,
                            neutrino_compat=neutrino_compat,
                            features=self.peer_features.get(from_nick, {}),
                        )
                        offers.append(offer)

                        if bond_data:
                            offer.fidelity_bond_data = bond_data

                        logger.debug(
                            f"Parsed {offer_type} from {from_nick}: "
                            f"oid={oid}, size={minsize}-{maxsize}, fee={cjfee}, "
                            f"has_bond={bond_data is not None}, neutrino_compat={neutrino_compat}"
                        )
                        parsed = True
                    except Exception as e:
                        logger.warning(f"Failed to parse {offer_type} from {from_nick}: {e}")
                    break

            if not parsed:
                logger.debug(f"Message not an offer: {rest[:50]}...")

        except Exception as e:
            logger.warning(f"Failed to process message: {e}")
            continue

    # NOTE: We trust the directory's orderbook as authoritative.
    # If a maker has an offer in the directory, they are considered online.
    # The directory server maintains the connection state and removes offers
    # when makers disconnect. Peerlist responses may be delayed or unavailable,
    # so we don't filter offers based on peerlist presence.
    #
    # This prevents incorrectly rejecting valid offers from active makers
    # whose peerlist entry hasn't been received yet.

    logger.info(
        f"Fetched {len(offers)} offers and {len(bonds)} fidelity bonds from "
        f"{self.host}:{self.port}"
    )
    return offers, bonds

Fetch orderbooks from all connected peers.

Trusts the directory's orderbook as authoritative - if a maker has an offer in the directory, they are considered online. The directory server maintains the connection state and removes offers when makers disconnect.

Returns

Tuple of (offers, fidelity_bonds)

def get_active_nicks(self) ‑> set[str]
Expand source code
def get_active_nicks(self) -> set[str]:
    """Get set of nicks from the last peerlist update."""
    return set(self._active_peers.keys())

Get set of nicks from the last peerlist update.

def get_current_bonds(self) ‑> list[FidelityBond]
Expand source code
def get_current_bonds(self) -> list[FidelityBond]:
    """Get the current list of cached fidelity bonds."""
    return list(self.bonds.values())

Get the current list of cached fidelity bonds.

def get_current_offers(self) ‑> list[Offer]
Expand source code
def get_current_offers(self) -> list[Offer]:
    """Get the current list of cached offers."""
    return [offer_data.offer for offer_data in self.offers.values()]

Get the current list of cached offers.

def get_negotiated_version(self) ‑> int
Expand source code
def get_negotiated_version(self) -> int:
    """
    Get the negotiated protocol version.

    Returns:
        Negotiated version (always 5 with feature-based approach)
    """
    return self.negotiated_version if self.negotiated_version is not None else JM_VERSION

Get the negotiated protocol version.

Returns

Negotiated version (always 5 with feature-based approach)

def get_offers_with_timestamps(self) ‑> list[OfferWithTimestamp]
Expand source code
def get_offers_with_timestamps(self) -> list[OfferWithTimestamp]:
    """Get offers with their timestamp metadata."""
    return list(self.offers.values())

Get offers with their timestamp metadata.

async def get_peerlist(self) ‑> list[str] | None
Expand source code
async def get_peerlist(self) -> list[str] | None:
    """
    Fetch the current list of connected peers.

    Note: Reference implementation directories do NOT support GETPEERLIST.
    This method shares peerlist support tracking with get_peerlist_with_features().

    The directory may send multiple PEERLIST messages (chunked response) to
    avoid overwhelming slow Tor connections. This method accumulates peers
    from all chunks.

    Returns:
        List of active peer nicks. Returns empty list if directory doesn't
        support GETPEERLIST. Returns None if rate-limited (use cached data).
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    # Skip if we already know this directory doesn't support GETPEERLIST
    # (only applies to directories that didn't announce peerlist_features)
    if self._peerlist_supported is False and not self.directory_peerlist_features:
        logger.debug("Skipping GETPEERLIST - directory doesn't support it")
        return []

    # Rate-limit peerlist requests to avoid spamming
    import time

    current_time = time.time()
    if current_time - self._last_peerlist_request_time < self._peerlist_min_interval:
        logger.debug(
            f"Skipping GETPEERLIST - rate limited "
            f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)"
        )
        return None

    self._last_peerlist_request_time = current_time

    getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""}
    logger.debug("Sending GETPEERLIST request")
    await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8"))

    start_time = asyncio.get_event_loop().time()

    # Timeout for waiting for the first PEERLIST response
    first_response_timeout = (
        self._peerlist_timeout if self.directory_peerlist_features else self.timeout
    )

    # Timeout between chunks - when this expires after receiving at least one
    # PEERLIST message, we know the directory has finished sending all chunks
    inter_chunk_timeout = self._peerlist_chunk_timeout

    # Accumulate peers from multiple PEERLIST chunks
    all_peers: list[str] = []
    chunks_received = 0
    got_first_response = False

    while True:
        elapsed = asyncio.get_event_loop().time() - start_time

        # Determine timeout for this receive
        if not got_first_response:
            remaining = first_response_timeout - elapsed
            if remaining <= 0:
                self._handle_peerlist_timeout()
                return []
            receive_timeout = remaining
        else:
            receive_timeout = inter_chunk_timeout

        try:
            response_data = await asyncio.wait_for(
                self.connection.receive(), timeout=receive_timeout
            )
            response = json.loads(response_data.decode("utf-8"))
            msg_type = response.get("type")

            if msg_type == MessageType.PEERLIST.value:
                got_first_response = True
                chunks_received += 1
                peerlist_str = response.get("line", "")

                # Parse this chunk
                chunk_peers: list[str] = []
                if peerlist_str:
                    for entry in peerlist_str.split(","):
                        if not entry or not entry.strip():
                            continue
                        if NICK_PEERLOCATOR_SEPARATOR not in entry:
                            logger.debug(f"Skipping metadata entry in peerlist: '{entry}'")
                            continue
                        try:
                            nick, location, disconnected, _features = parse_peerlist_entry(
                                entry
                            )
                            logger.debug(
                                f"Parsed peer: {nick} at {location}, "
                                f"disconnected={disconnected}"
                            )
                            if not disconnected:
                                chunk_peers.append(nick)
                        except ValueError as e:
                            logger.warning(f"Failed to parse peerlist entry '{entry}': {e}")
                            continue

                all_peers.extend(chunk_peers)
                logger.debug(
                    f"Received PEERLIST chunk {chunks_received} with "
                    f"{len(chunk_peers)} peers (total: {len(all_peers)})"
                )
                continue

            # Buffer unexpected messages
            logger.trace(
                f"Buffering unexpected message type {msg_type} while waiting for PEERLIST"
            )
            await self._message_buffer.put(response)

        except TimeoutError:
            if not got_first_response:
                self._handle_peerlist_timeout()
                return []
            # Inter-chunk timeout means we're done
            break

        except Exception as e:
            logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}")
            elapsed = asyncio.get_event_loop().time() - start_time
            if not got_first_response and elapsed > first_response_timeout:
                self._handle_peerlist_timeout()
                return []
            if got_first_response:
                break

    # Mark peerlist as supported since we got a valid response
    self._peerlist_supported = True
    self._peerlist_timeout_count = 0

    logger.info(f"Received {len(all_peers)} active peers from {self.host}:{self.port}")
    return all_peers

Fetch the current list of connected peers.

Note: Reference implementation directories do NOT support GETPEERLIST. This method shares peerlist support tracking with get_peerlist_with_features().

The directory may send multiple PEERLIST messages (chunked response) to avoid overwhelming slow Tor connections. This method accumulates peers from all chunks.

Returns

List of active peer nicks. Returns empty list if directory doesn't support GETPEERLIST. Returns None if rate-limited (use cached data).

async def get_peerlist_with_features(self) ‑> list[tuple[str, str, FeatureSet]]
Expand source code
async def get_peerlist_with_features(self) -> list[tuple[str, str, FeatureSet]]:
    """
    Fetch the current list of connected peers with their features.

    Uses the standard GETPEERLIST message. If the directory supports
    peerlist_features, the response will include F: suffix with features.

    Note: Reference implementation directories do NOT support GETPEERLIST.
    This method tracks whether the directory supports it and skips requests
    to unsupported directories to avoid spamming warnings in their logs.

    The directory may send multiple PEERLIST messages (chunked response) to
    avoid overwhelming slow Tor connections. This method accumulates peers
    from all chunks until no more PEERLIST messages arrive within the
    inter-chunk timeout.

    Returns:
        List of (nick, location, features) tuples for active peers.
        Features will be empty for directories that don't support peerlist_features.
        Returns empty list if directory doesn't support GETPEERLIST or is rate-limited.
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    # Skip if we already know this directory doesn't support GETPEERLIST
    # (only applies to directories that didn't announce peerlist_features)
    if self._peerlist_supported is False and not self.directory_peerlist_features:
        logger.debug("Skipping GETPEERLIST - directory doesn't support it")
        return []

    # Rate-limit peerlist requests to avoid spamming
    import time

    current_time = time.time()
    if current_time - self._last_peerlist_request_time < self._peerlist_min_interval:
        logger.debug(
            f"Skipping GETPEERLIST - rate limited "
            f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)"
        )
        return []  # Return empty - will use offers for nick tracking

    self._last_peerlist_request_time = current_time

    getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""}
    logger.debug("Sending GETPEERLIST request")
    await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8"))

    start_time = asyncio.get_event_loop().time()

    # Timeout for waiting for the first PEERLIST response
    # Use longer timeout for directories that support peerlist_features
    first_response_timeout = (
        self._peerlist_timeout if self.directory_peerlist_features else self.timeout
    )

    # Timeout between chunks - when this expires after receiving at least one
    # PEERLIST message, we know the directory has finished sending all chunks
    inter_chunk_timeout = self._peerlist_chunk_timeout

    # Accumulate peers from multiple PEERLIST chunks
    all_peers: list[tuple[str, str, FeatureSet]] = []
    chunks_received = 0
    got_first_response = False

    while True:
        elapsed = asyncio.get_event_loop().time() - start_time

        # Determine timeout for this receive
        if not got_first_response:
            # Waiting for first PEERLIST - use full timeout
            remaining = first_response_timeout - elapsed
            if remaining <= 0:
                self._handle_peerlist_timeout()
                return []
            receive_timeout = remaining
        else:
            # Already received at least one chunk - use shorter inter-chunk timeout
            receive_timeout = inter_chunk_timeout

        try:
            response_data = await asyncio.wait_for(
                self.connection.receive(), timeout=receive_timeout
            )
            response = json.loads(response_data.decode("utf-8"))
            msg_type = response.get("type")

            if msg_type == MessageType.PEERLIST.value:
                got_first_response = True
                chunks_received += 1
                peerlist_str = response.get("line", "")
                chunk_peers = self._handle_peerlist_response(peerlist_str)
                all_peers.extend(chunk_peers)
                logger.debug(
                    f"Received PEERLIST chunk {chunks_received} with "
                    f"{len(chunk_peers)} peers (total: {len(all_peers)})"
                )
                # Continue to check for more chunks
                continue

            # Buffer unexpected messages (like PUBMSG offers) for later processing
            logger.trace(
                f"Buffering unexpected message type {msg_type} while waiting for PEERLIST"
            )
            await self._message_buffer.put(response)

        except TimeoutError:
            if not got_first_response:
                # Never received any PEERLIST - this is a real timeout
                self._handle_peerlist_timeout()
                return []
            # Received at least one chunk, inter-chunk timeout means we're done
            logger.debug(
                f"Peerlist complete: received {len(all_peers)} peers "
                f"in {chunks_received} chunks"
            )
            break

        except Exception as e:
            logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}")
            elapsed = asyncio.get_event_loop().time() - start_time
            if not got_first_response and elapsed > first_response_timeout:
                self._handle_peerlist_timeout()
                return []
            # If we already have some data, return what we have
            if got_first_response:
                break

    # Success - reset timeout counter and mark as supported
    self._peerlist_timeout_count = 0
    self._peerlist_supported = True
    return all_peers

Fetch the current list of connected peers with their features.

Uses the standard GETPEERLIST message. If the directory supports peerlist_features, the response will include F: suffix with features.

Note: Reference implementation directories do NOT support GETPEERLIST. This method tracks whether the directory supports it and skips requests to unsupported directories to avoid spamming warnings in their logs.

The directory may send multiple PEERLIST messages (chunked response) to avoid overwhelming slow Tor connections. This method accumulates peers from all chunks until no more PEERLIST messages arrive within the inter-chunk timeout.

Returns

List of (nick, location, features) tuples for active peers. Features will be empty for directories that don't support peerlist_features. Returns empty list if directory doesn't support GETPEERLIST or is rate-limited.

async def listen_continuously(self, request_orderbook: bool = True) ‑> None
Expand source code
async def listen_continuously(self, request_orderbook: bool = True) -> None:
    """
    Continuously listen for messages and update internal offer/bond caches.

    This method runs indefinitely until stop() is called or connection is lost.
    Used by orderbook_watcher and maker to maintain live orderbook state.

    Args:
        request_orderbook: If True, send !orderbook request on startup to get
            current offers from makers. Set to False for maker bots that don't
            need to receive other offers.
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    logger.info(f"Starting continuous listening on {self.host}:{self.port}")
    self.running = True

    # Fetch peerlist with features to populate peer_features cache
    # This allows us to know which features each maker supports
    # Note: This may return empty if directory doesn't support GETPEERLIST (reference impl)
    try:
        await self.get_peerlist_with_features()
        if self._peerlist_supported:
            logger.info(f"Populated peer_features cache with {len(self.peer_features)} peers")
        else:
            logger.info(
                "Directory doesn't support GETPEERLIST - peer features will be "
                "learned from offer messages"
            )
    except Exception as e:
        logger.warning(f"Failed to fetch peerlist with features: {e}")

    # Request current orderbook from makers
    if request_orderbook:
        try:
            pubmsg = {
                "type": MessageType.PUBMSG.value,
                "line": f"{self.nick}!PUBLIC!!orderbook",
            }
            await self.connection.send(json.dumps(pubmsg).encode("utf-8"))
            logger.info("Sent !orderbook request to get current offers")
        except Exception as e:
            logger.warning(f"Failed to send !orderbook request: {e}")

    # Track when we last sent an orderbook request (to avoid spamming)
    import time

    last_orderbook_request = time.time()
    orderbook_request_min_interval = 60.0  # Minimum 60 seconds between requests

    while self.running:
        try:
            # First check if we have buffered messages from previous operations
            # (e.g., messages received while waiting for PEERLIST)
            if not self._message_buffer.empty():
                message = await self._message_buffer.get()
                logger.trace("Processing buffered message from queue")
            else:
                # Read next message with timeout
                data = await asyncio.wait_for(self.connection.receive(), timeout=5.0)

                if not data:
                    logger.warning(f"Connection to {self.host}:{self.port} closed")
                    break

                message = json.loads(data.decode("utf-8"))
            msg_type = message.get("type")
            line = message.get("line", "")

            # Handle PEERLIST responses (from periodic or automatic requests)
            if msg_type == MessageType.PEERLIST.value:
                try:
                    self._handle_peerlist_response(line)
                except Exception as e:
                    logger.debug(f"Failed to process PEERLIST: {e}")
                continue

            # Process PUBMSG and PRIVMSG to update offers/bonds cache
            # Reference implementation sends offer responses to !orderbook via PRIVMSG
            if msg_type in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value):
                try:
                    parts = line.split(COMMAND_PREFIX)
                    if len(parts) >= 3:
                        from_nick = parts[0]
                        to_nick = parts[1]
                        rest = COMMAND_PREFIX.join(parts[2:])

                        # Accept PUBLIC broadcasts or messages addressed to us
                        if to_nick == "PUBLIC" or to_nick == self.nick:
                            # If we don't have features for this peer, it's a new peer.
                            # Track them with empty features for now - we'll get their features
                            # from the initial peerlist or from their offer messages
                            is_new_peer = from_nick not in self.peer_features
                            current_time = time.time()

                            if is_new_peer:
                                # Track new peer - merge empty features (will be a no-op
                                # if we already know their features from another source)
                                # Features will be populated from offer messages or peerlist
                                self._merge_peer_features(from_nick, {})
                                logger.debug(f"Discovered new peer: {from_nick}")

                                # If directory supports peerlist_features, request updated peerlist
                                # to get this peer's features immediately
                                if (
                                    self.directory_peerlist_features
                                    and self._peerlist_supported
                                ):
                                    try:
                                        # Request peerlist to get features for new peer
                                        # This is a background task - don't block message processing
                                        asyncio.create_task(
                                            self._refresh_peerlist_for_new_peer()
                                        )
                                    except Exception as e:
                                        logger.debug(
                                            f"Failed to request peerlist for new peer: {e}"
                                        )

                                # Request orderbook from new peer (rate-limited)
                                if (
                                    request_orderbook
                                    and current_time - last_orderbook_request
                                    > orderbook_request_min_interval
                                ):
                                    try:
                                        pubmsg = {
                                            "type": MessageType.PUBMSG.value,
                                            "line": f"{self.nick}!PUBLIC!!orderbook",
                                        }
                                        await self.connection.send(
                                            json.dumps(pubmsg).encode("utf-8")
                                        )
                                        last_orderbook_request = current_time
                                        logger.info(
                                            f"Sent !orderbook request for new peer {from_nick}"
                                        )
                                    except Exception as e:
                                        logger.debug(f"Failed to send !orderbook: {e}")

                            # Parse offer announcements
                            for offer_type_prefix in [
                                "sw0reloffer",
                                "sw0absoffer",
                                "swreloffer",
                                "swabsoffer",
                            ]:
                                if rest.startswith(offer_type_prefix):
                                    # Separate offer from fidelity bond data
                                    rest_parts = rest.split(COMMAND_PREFIX, 1)
                                    offer_line = rest_parts[0].strip()

                                    # Parse fidelity bond if present
                                    bond_data = None
                                    if len(rest_parts) > 1 and rest_parts[1].startswith(
                                        "tbond "
                                    ):
                                        bond_parts = rest_parts[1][6:].split()
                                        if bond_parts:
                                            bond_proof_b64 = bond_parts[0]
                                            # For PUBLIC announcements, maker uses their own nick
                                            # as taker_nick when creating the proof.
                                            # For PRIVMSG (response to !orderbook), maker signs
                                            # for the recipient (us).
                                            taker_nick_for_proof = (
                                                from_nick if to_nick == "PUBLIC" else to_nick
                                            )
                                            bond_data = parse_fidelity_bond_proof(
                                                bond_proof_b64, from_nick, taker_nick_for_proof
                                            )
                                            if bond_data:
                                                logger.debug(
                                                    f"Parsed fidelity bond from {from_nick}: "
                                                    f"txid={bond_data['utxo_txid'][:16]}..., "
                                                    f"locktime={bond_data['locktime']}"
                                                )
                                                # Store bond in bonds cache
                                                utxo_str = (
                                                    f"{bond_data['utxo_txid']}:"
                                                    f"{bond_data['utxo_vout']}"
                                                )
                                                bond = FidelityBond(
                                                    counterparty=from_nick,
                                                    utxo_txid=bond_data["utxo_txid"],
                                                    utxo_vout=bond_data["utxo_vout"],
                                                    locktime=bond_data["locktime"],
                                                    script=bond_data["utxo_pub"],
                                                    utxo_confirmations=0,
                                                    cert_expiry=bond_data["cert_expiry"],
                                                    fidelity_bond_data=bond_data,
                                                )
                                                self.bonds[utxo_str] = bond

                                    offer_parts = offer_line.split()
                                    if len(offer_parts) >= 6:
                                        try:
                                            oid = int(offer_parts[1])
                                            minsize = int(offer_parts[2])
                                            maxsize = int(offer_parts[3])
                                            txfee = int(offer_parts[4])
                                            cjfee_str = offer_parts[5]

                                            if offer_type_prefix in [
                                                "sw0absoffer",
                                                "swabsoffer",
                                            ]:
                                                cjfee = str(int(cjfee_str))
                                            else:
                                                cjfee = str(Decimal(cjfee_str))

                                            offer = Offer(
                                                counterparty=from_nick,
                                                oid=oid,
                                                ordertype=OfferType(offer_type_prefix),
                                                minsize=minsize,
                                                maxsize=maxsize,
                                                txfee=txfee,
                                                cjfee=cjfee,
                                                fidelity_bond_value=0,
                                                fidelity_bond_data=bond_data,
                                                features=self.peer_features.get(from_nick, {}),
                                            )

                                            # Extract bond UTXO key for deduplication
                                            bond_utxo_key: str | None = None
                                            if bond_data:
                                                bond_utxo_key = (
                                                    f"{bond_data['utxo_txid']}:"
                                                    f"{bond_data['utxo_vout']}"
                                                )

                                            # Update cache using tuple key
                                            offer_key = (from_nick, oid)
                                            self._store_offer(offer_key, offer, bond_utxo_key)

                                            # Track this peer as "known" even if peerlist didn't
                                            # return features. This prevents re-triggering new peer
                                            # logic for every message from this peer.
                                            if from_nick not in self.peer_features:
                                                self.peer_features[from_nick] = {}

                                            logger.debug(
                                                f"Updated offer cache: {from_nick} "
                                                f"{offer_type_prefix} oid={oid}"
                                                + (" (with bond)" if bond_data else "")
                                            )
                                        except Exception as e:
                                            logger.debug(f"Failed to parse offer update: {e}")
                                    break
                except Exception as e:
                    logger.debug(f"Failed to process PUBMSG: {e}")

        except TimeoutError:
            continue
        except asyncio.CancelledError:
            logger.info(f"Continuous listening on {self.host}:{self.port} cancelled")
            break
        except Exception as e:
            logger.error(f"Error in continuous listening: {e}")
            if self.on_disconnect:
                self.on_disconnect()
            break

    self.running = False
    logger.info(f"Stopped continuous listening on {self.host}:{self.port}")

Continuously listen for messages and update internal offer/bond caches.

This method runs indefinitely until stop() is called or connection is lost. Used by orderbook_watcher and maker to maintain live orderbook state.

Args

request_orderbook
If True, send !orderbook request on startup to get current offers from makers. Set to False for maker bots that don't need to receive other offers.
async def listen_for_messages(self, duration: float = 5.0) ‑> list[dict[str, typing.Any]]
Expand source code
async def listen_for_messages(self, duration: float = 5.0) -> list[dict[str, Any]]:
    """
    Listen for messages for a specified duration.

    This method collects all messages received within the specified duration.
    It properly handles connection closed errors by raising DirectoryClientError.

    Args:
        duration: How long to listen in seconds

    Returns:
        List of received messages

    Raises:
        DirectoryClientError: If not connected or connection is lost
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    # Check connection state before starting
    if not self.connection.is_connected():
        raise DirectoryClientError("Connection closed")

    messages: list[dict[str, Any]] = []
    start_time = asyncio.get_event_loop().time()

    # First, drain any buffered messages into our result list
    # These are messages that were received while waiting for other responses
    while not self._message_buffer.empty():
        try:
            buffered_msg = self._message_buffer.get_nowait()
            logger.trace(
                f"Processing buffered message type {buffered_msg.get('type')}: "
                f"{buffered_msg.get('line', '')[:80]}..."
            )
            messages.append(buffered_msg)
        except asyncio.QueueEmpty:
            break

    while asyncio.get_event_loop().time() - start_time < duration:
        try:
            remaining_time = duration - (asyncio.get_event_loop().time() - start_time)
            if remaining_time <= 0:
                break

            response_data = await asyncio.wait_for(
                self.connection.receive(), timeout=remaining_time
            )
            response = json.loads(response_data.decode("utf-8"))
            logger.trace(
                f"Received message type {response.get('type')}: "
                f"{response.get('line', '')[:80]}..."
            )
            messages.append(response)

        except TimeoutError:
            # Normal timeout - no more messages within duration
            break
        except Exception as e:
            # Connection errors should propagate up so caller can reconnect
            error_msg = str(e).lower()
            if "connection" in error_msg and ("closed" in error_msg or "lost" in error_msg):
                raise DirectoryClientError(f"Connection lost: {e}") from e
            # Other errors (JSON parse, etc) - log and continue
            logger.warning(f"Error processing message: {e}")
            continue

    logger.trace(f"Collected {len(messages)} messages in {duration}s")
    return messages

Listen for messages for a specified duration.

This method collects all messages received within the specified duration. It properly handles connection closed errors by raising DirectoryClientError.

Args

duration
How long to listen in seconds

Returns

List of received messages

Raises

DirectoryClientError
If not connected or connection is lost
def remove_offers_for_nick(self, nick: str) ‑> int
Expand source code
def remove_offers_for_nick(self, nick: str) -> int:
    """
    Remove all offers from a specific nick (e.g., when nick goes offline).

    This is the equivalent of the reference implementation's on_nick_leave callback.

    Args:
        nick: The nick to remove offers for

    Returns:
        Number of offers removed
    """
    keys_to_remove = [key for key in self.offers if key[0] == nick]
    removed = 0

    for key in keys_to_remove:
        offer_data = self.offers.pop(key, None)
        if offer_data:
            removed += 1
            # Clean up bond mapping
            if offer_data.bond_utxo_key and offer_data.bond_utxo_key in self._bond_to_offers:
                self._bond_to_offers[offer_data.bond_utxo_key].discard(key)

    if removed > 0:
        logger.info(f"Removed {removed} offers for nick {nick} (left/offline)")

    # Also remove from peer_features and active_peers
    self.peer_features.pop(nick, None)
    self._active_peers.pop(nick, None)

    # Remove any bonds from this nick
    bonds_to_remove = [k for k, v in self.bonds.items() if v.counterparty == nick]
    for bond_key in bonds_to_remove:
        del self.bonds[bond_key]

    return removed

Remove all offers from a specific nick (e.g., when nick goes offline).

This is the equivalent of the reference implementation's on_nick_leave callback.

Args

nick
The nick to remove offers for

Returns

Number of offers removed

async def send_private_message(self, recipient: str, command: str, data: str) ‑> None
Expand source code
async def send_private_message(self, recipient: str, command: str, data: str) -> None:
    """
    Send a signed private message to a specific peer.

    JoinMarket requires all private messages to be signed with the sender's
    nick private key. The signature is appended to the message:
    Format: "!<command> <data> <pubkey_hex> <signature>"

    The message-to-sign is: data + hostid (to prevent replay attacks)
    Note: Only the data is signed, NOT the command prefix.

    Args:
        recipient: Target peer nick
        command: Command name (without ! prefix, e.g., 'fill', 'auth', 'tx')
        data: Command arguments to send (will be signed)
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    # Sign just the data (not the command) with our nick identity
    # Reference: rawmessage = ' '.join(message[1:].split(' ')[1:-2])
    # This means they extract [1:-2] which is the args, not the command
    # So we sign: data + hostid
    # IMPORTANT: Always use ONION_HOSTID ("onion-network"), NOT the directory hostname.
    # The reference implementation uses a fixed hostid for ALL onion message channels
    # (see jmdaemon/onionmc.py line 635: self.hostid = "onion-network")
    signed_data = self.nick_identity.sign_message(data, ONION_HOSTID)

    # JoinMarket message format: from_nick!to_nick!command <args>
    # The COMMAND_PREFIX ("!") is used ONLY as a field separator between
    # from_nick, to_nick, and the message content. The command itself
    # does NOT have a "!" prefix.
    # Format: "<command> <signed_data>" where signed_data = "<data> <pubkey_hex> <sig_b64>"
    full_message = f"{command} {signed_data}"

    privmsg = {
        "type": MessageType.PRIVMSG.value,
        "line": f"{self.nick}!{recipient}!{full_message}",
    }
    await self.connection.send(json.dumps(privmsg).encode("utf-8"))

Send a signed private message to a specific peer.

JoinMarket requires all private messages to be signed with the sender's nick private key. The signature is appended to the message: Format: "! "

The message-to-sign is: data + hostid (to prevent replay attacks) Note: Only the data is signed, NOT the command prefix.

Args

recipient
Target peer nick
command
Command name (without ! prefix, e.g., 'fill', 'auth', 'tx')
data
Command arguments to send (will be signed)
async def send_public_message(self, message: str) ‑> None
Expand source code
async def send_public_message(self, message: str) -> None:
    """
    Send a public message to all peers.

    Args:
        message: Message to broadcast
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    pubmsg = {
        "type": MessageType.PUBMSG.value,
        "line": f"{self.nick}!PUBLIC!{message}",
    }
    await self.connection.send(json.dumps(pubmsg).encode("utf-8"))

Send a public message to all peers.

Args

message
Message to broadcast
def stop(self) ‑> None
Expand source code
def stop(self) -> None:
    """Stop continuous listening."""
    self.running = False

Stop continuous listening.

def supports_extended_utxo_format(self) ‑> bool
Expand source code
def supports_extended_utxo_format(self) -> bool:
    """
    Check if we should use extended UTXO format with this directory.

    Extended format (txid:vout:scriptpubkey:blockheight) is used when
    both sides advertise neutrino_compat feature. Protocol version
    is not checked - features are negotiated independently.

    Returns:
        True if extended UTXO format should be used
    """
    return self.neutrino_compat and self.directory_neutrino_compat

Check if we should use extended UTXO format with this directory.

Extended format (txid:vout:scriptpubkey:blockheight) is used when both sides advertise neutrino_compat feature. Protocol version is not checked - features are negotiated independently.

Returns

True if extended UTXO format should be used

class DirectoryClientError (*args, **kwargs)
Expand source code
class DirectoryClientError(Exception):
    """Error raised by DirectoryClient operations."""

Error raised by DirectoryClient operations.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class OfferWithTimestamp (offer: Offer, received_at: float, bond_utxo_key: str | None = None)
Expand source code
class OfferWithTimestamp:
    """Wrapper for Offer with metadata for staleness tracking."""

    __slots__ = ("offer", "received_at", "bond_utxo_key")

    def __init__(self, offer: Offer, received_at: float, bond_utxo_key: str | None = None) -> None:
        self.offer = offer
        self.received_at = received_at
        # Bond UTXO key (txid:vout) for deduplication across nick changes
        self.bond_utxo_key = bond_utxo_key

Wrapper for Offer with metadata for staleness tracking.

Instance variables

var bond_utxo_key : None
Expand source code
class OfferWithTimestamp:
    """Wrapper for Offer with metadata for staleness tracking."""

    __slots__ = ("offer", "received_at", "bond_utxo_key")

    def __init__(self, offer: Offer, received_at: float, bond_utxo_key: str | None = None) -> None:
        self.offer = offer
        self.received_at = received_at
        # Bond UTXO key (txid:vout) for deduplication across nick changes
        self.bond_utxo_key = bond_utxo_key
var offer : None
Expand source code
class OfferWithTimestamp:
    """Wrapper for Offer with metadata for staleness tracking."""

    __slots__ = ("offer", "received_at", "bond_utxo_key")

    def __init__(self, offer: Offer, received_at: float, bond_utxo_key: str | None = None) -> None:
        self.offer = offer
        self.received_at = received_at
        # Bond UTXO key (txid:vout) for deduplication across nick changes
        self.bond_utxo_key = bond_utxo_key
var received_at : None
Expand source code
class OfferWithTimestamp:
    """Wrapper for Offer with metadata for staleness tracking."""

    __slots__ = ("offer", "received_at", "bond_utxo_key")

    def __init__(self, offer: Offer, received_at: float, bond_utxo_key: str | None = None) -> None:
        self.offer = offer
        self.received_at = received_at
        # Bond UTXO key (txid:vout) for deduplication across nick changes
        self.bond_utxo_key = bond_utxo_key