Module orderbook_watcher.directory_client
Legacy compatibility wrapper for DirectoryClient.
This module re-exports jmcore.DirectoryClient for backward compatibility. All new code should import directly from jmcore.directory_client.
Functions
def parse_fidelity_bond_proof(proof_base64: str, maker_nick: str, taker_nick: str, verify: bool = True) ‑> dict[str, typing.Any] | None-
Expand source code
def parse_fidelity_bond_proof( proof_base64: str, maker_nick: str, taker_nick: str, verify: bool = True ) -> dict[str, Any] | None: """ Parse and optionally verify a fidelity bond proof from base64-encoded binary data. Args: proof_base64: Base64-encoded bond proof maker_nick: Maker's nick taker_nick: Taker's nick (requesting party) verify: If True, verify both signatures in the proof (default: True) Returns: Dict with bond details or None if parsing/verification fails """ # First, verify the signatures if requested if verify: is_valid, verified_data, error = verify_fidelity_bond_proof( proof_base64, maker_nick, taker_nick ) if not is_valid: logger.warning(f"Fidelity bond proof verification failed for {maker_nick}: {error}") return None # Parse the proof data (also extracts redeem script) try: decoded_data = base64.b64decode(proof_base64) except (binascii.Error, ValueError) as e: logger.warning(f"Failed to decode bond proof: {e}") return None if len(decoded_data) != 252: logger.warning(f"Invalid bond proof length: {len(decoded_data)}, expected 252") return None try: unpacked_data = struct.unpack("<72s72s33sH33s32sII", decoded_data) txid = unpacked_data[5] vout = unpacked_data[6] locktime = unpacked_data[7] utxo_pub = unpacked_data[4] cert_pub = unpacked_data[2] cert_expiry_raw = unpacked_data[3] cert_expiry = cert_expiry_raw * 2016 utxo_pub_hex = binascii.hexlify(utxo_pub).decode("ascii") redeem_script = mk_freeze_script(utxo_pub_hex, locktime) redeem_script_hex = binascii.hexlify(redeem_script).decode("ascii") p2wsh_script = redeem_script_to_p2wsh_script(redeem_script) p2wsh_script_hex = binascii.hexlify(p2wsh_script).decode("ascii") return { "maker_nick": maker_nick, "taker_nick": taker_nick, "utxo_txid": binascii.hexlify(txid).decode("ascii"), "utxo_vout": vout, "locktime": locktime, "utxo_pub": utxo_pub_hex, "cert_pub": binascii.hexlify(cert_pub).decode("ascii"), "cert_expiry": cert_expiry, "proof": proof_base64, "redeem_script": redeem_script_hex, "p2wsh_script": p2wsh_script_hex, } except Exception as e: logger.warning(f"Failed to unpack bond proof: {e}") return NoneParse and optionally verify a fidelity bond proof from base64-encoded binary data.
Args
proof_base64- Base64-encoded bond proof
maker_nick- Maker's nick
taker_nick- Taker's nick (requesting party)
verify- If True, verify both signatures in the proof (default: True)
Returns
Dict with bond details or None if parsing/verification fails
Classes
class DirectoryClient (host: str,
port: int,
network: str,
nick_identity: NickIdentity | None = None,
location: str = 'NOT-SERVING-ONION',
socks_host: str = '127.0.0.1',
socks_port: int = 9050,
timeout: float = 30.0,
max_message_size: int = 2097152,
on_disconnect: Callable[[], None] | None = None,
neutrino_compat: bool = False)-
Expand source code
class DirectoryClient: """ Client for connecting to JoinMarket directory servers. Supports: - Direct TCP connections (for local/dev) - Tor connections (for .onion addresses) - Handshake protocol - Peerlist fetching - Orderbook fetching - Continuous listening for updates """ def __init__( self, host: str, port: int, network: str, nick_identity: NickIdentity | None = None, location: str = "NOT-SERVING-ONION", socks_host: str = "127.0.0.1", socks_port: int = 9050, timeout: float = 30.0, max_message_size: int = 2097152, on_disconnect: Callable[[], None] | None = None, neutrino_compat: bool = False, ) -> None: """ Initialize DirectoryClient. Args: host: Directory server hostname or .onion address port: Directory server port network: Bitcoin network (mainnet, testnet, signet, regtest) nick_identity: NickIdentity for message signing (generated if None) location: Our location string (onion address or NOT-SERVING-ONION) socks_host: SOCKS proxy host for Tor socks_port: SOCKS proxy port for Tor timeout: Connection timeout in seconds max_message_size: Maximum message size in bytes on_disconnect: Callback when connection drops neutrino_compat: Advertise support for Neutrino-compatible UTXO metadata """ self.host = host self.port = port self.network = network self.location = location self.socks_host = socks_host self.socks_port = socks_port self.timeout = timeout self.max_message_size = max_message_size self.connection: TCPConnection | None = None self.nick_identity = nick_identity or NickIdentity(JM_VERSION) self.nick = self.nick_identity.nick # hostid is used for message signing to prevent replay attacks # For onion-based networks, this is always "onion-network" self.hostid = "onion-network" self.offers: dict[tuple[str, int], Offer] = {} self.bonds: dict[str, FidelityBond] = {} self.peer_features: dict[str, dict[str, bool]] = {} # nick -> features dict self.running = False self.on_disconnect = on_disconnect self.initial_orderbook_received = False self.last_orderbook_request_time: float = 0.0 self.last_offer_received_time: float | None = None self.neutrino_compat = neutrino_compat # Version negotiation state (set after handshake) self.negotiated_version: int | None = None self.directory_neutrino_compat: bool = False self.directory_peerlist_features: bool = False # True if directory supports F: suffix # Timing intervals self.peerlist_check_interval = 1800.0 self.orderbook_refresh_interval = 1800.0 self.orderbook_retry_interval = 300.0 self.zero_offer_retry_interval = 600.0 # Peerlist support tracking # If the directory doesn't support getpeerlist (e.g., reference implementation), # we track this to avoid spamming unsupported requests self._peerlist_supported: bool | None = None # None = unknown, True/False = known self._last_peerlist_request_time: float = 0.0 self._peerlist_min_interval: float = 60.0 # Minimum seconds between peerlist requests async def connect(self) -> None: """Connect to the directory server and perform handshake.""" try: logger.debug(f"DirectoryClient.connect: connecting to {self.host}:{self.port}") if not self.host.endswith(".onion"): self.connection = await connect_direct( self.host, self.port, self.max_message_size, self.timeout, ) logger.debug("DirectoryClient.connect: direct connection established") else: self.connection = await connect_via_tor( self.host, self.port, self.socks_host, self.socks_port, self.max_message_size, self.timeout, ) logger.debug("DirectoryClient.connect: tor connection established") logger.debug("DirectoryClient.connect: starting handshake") await self._handshake() logger.debug("DirectoryClient.connect: handshake complete") except Exception as e: logger.error(f"Failed to connect to {self.host}:{self.port}: {e}", exc_info=True) # Clean up connection if handshake failed if self.connection: with contextlib.suppress(Exception): await self.connection.close() self.connection = None raise DirectoryClientError(f"Connection failed: {e}") from e async def _handshake(self) -> None: """ Perform directory server handshake with feature negotiation. We use proto-ver=5 for reference implementation compatibility. Features like neutrino_compat are negotiated independently via the features dict in the handshake payload. """ if not self.connection: raise DirectoryClientError("Not connected") # Build our feature set - always include peerlist_features to indicate we support # the extended peerlist format with F: suffix for feature information our_features: set[str] = {FEATURE_PEERLIST_FEATURES} if self.neutrino_compat: our_features.add(FEATURE_NEUTRINO_COMPAT) feature_set = FeatureSet(features=our_features) # Send our handshake with current version and features handshake_data = create_handshake_request( nick=self.nick, location=self.location, network=self.network, directory=False, features=feature_set, ) logger.debug(f"DirectoryClient._handshake: created handshake data: {handshake_data}") handshake_msg = { "type": MessageType.HANDSHAKE.value, "line": json.dumps(handshake_data), } logger.debug("DirectoryClient._handshake: sending handshake message") await self.connection.send(json.dumps(handshake_msg).encode("utf-8")) logger.debug("DirectoryClient._handshake: handshake sent, waiting for response") # Receive and parse directory's response response_data = await asyncio.wait_for(self.connection.receive(), timeout=self.timeout) logger.debug(f"DirectoryClient._handshake: received response: {response_data[:200]!r}") response = json.loads(response_data.decode("utf-8")) if response["type"] not in (MessageType.HANDSHAKE.value, MessageType.DN_HANDSHAKE.value): raise DirectoryClientError(f"Unexpected response type: {response['type']}") handshake_response = json.loads(response["line"]) if not handshake_response.get("accepted", False): raise DirectoryClientError("Handshake rejected") # Extract directory's version range # Reference directories only send "proto-ver" (single value, typically 5) dir_ver_min = handshake_response.get("proto-ver-min") dir_ver_max = handshake_response.get("proto-ver-max") if dir_ver_min is None or dir_ver_max is None: # Reference directory: only sends single proto-ver dir_version = handshake_response.get("proto-ver", 5) dir_ver_min = dir_ver_max = dir_version # Verify compatibility with our version (we only support v5) if not (dir_ver_min <= JM_VERSION <= dir_ver_max): raise DirectoryClientError( f"No compatible protocol version: we support v{JM_VERSION}, " f"directory supports [{dir_ver_min}, {dir_ver_max}]" ) # Use v5 (our only supported version) self.negotiated_version = JM_VERSION # Check if directory supports Neutrino-compatible metadata self.directory_neutrino_compat = peer_supports_neutrino_compat(handshake_response) # Check if directory supports peerlist_features (extended peerlist with F: suffix) dir_features = handshake_response.get("features", {}) self.directory_peerlist_features = dir_features.get(FEATURE_PEERLIST_FEATURES, False) logger.info( f"Handshake successful with {self.host}:{self.port} (nick: {self.nick}, " f"negotiated_version: v{self.negotiated_version}, " f"neutrino_compat: {self.directory_neutrino_compat}, " f"peerlist_features: {self.directory_peerlist_features})" ) async def get_peerlist(self) -> list[str]: """ Fetch the current list of connected peers. Note: Reference implementation directories do NOT support GETPEERLIST. This method shares peerlist support tracking with get_peerlist_with_features(). Returns: List of active peer nicks. Returns empty list if directory doesn't support GETPEERLIST. """ if not self.connection: raise DirectoryClientError("Not connected") # Skip if we already know this directory doesn't support GETPEERLIST if self._peerlist_supported is False: logger.debug("Skipping GETPEERLIST - directory doesn't support it") return [] # Rate-limit peerlist requests to avoid spamming import time current_time = time.time() if current_time - self._last_peerlist_request_time < self._peerlist_min_interval: logger.debug( f"Skipping GETPEERLIST - rate limited " f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)" ) return [] self._last_peerlist_request_time = current_time getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""} logger.debug("Sending GETPEERLIST request") await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8")) start_time = asyncio.get_event_loop().time() response = None while True: elapsed = asyncio.get_event_loop().time() - start_time if elapsed > self.timeout: # Timeout without PEERLIST response - directory likely doesn't support it logger.info( f"Timed out waiting for PEERLIST from {self.host}:{self.port} - " "directory likely doesn't support GETPEERLIST (reference implementation)" ) self._peerlist_supported = False return [] try: response_data = await asyncio.wait_for( self.connection.receive(), timeout=self.timeout - elapsed ) response = json.loads(response_data.decode("utf-8")) msg_type = response.get("type") logger.debug(f"Received response type: {msg_type}") if msg_type == MessageType.PEERLIST.value: break logger.debug( f"Skipping unexpected message type {msg_type} while waiting for PEERLIST" ) except TimeoutError: # Timeout without PEERLIST response - directory likely doesn't support it logger.info( f"Timed out waiting for PEERLIST from {self.host}:{self.port} - " "directory likely doesn't support GETPEERLIST (reference implementation)" ) self._peerlist_supported = False return [] except Exception as e: logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}") if asyncio.get_event_loop().time() - start_time > self.timeout: self._peerlist_supported = False return [] peerlist_str = response["line"] logger.debug(f"Peerlist string: {peerlist_str}") # Mark peerlist as supported since we got a valid response self._peerlist_supported = True if not peerlist_str: return [] peers = [] for entry in peerlist_str.split(","): # Skip empty entries if not entry or not entry.strip(): continue # Skip entries without separator - these are metadata (e.g., 'peerlist_features') # from the reference implementation, not actual peer entries if NICK_PEERLOCATOR_SEPARATOR not in entry: logger.debug(f"Skipping metadata entry in peerlist: '{entry}'") continue try: nick, location, disconnected, _features = parse_peerlist_entry(entry) logger.debug(f"Parsed peer: {nick} at {location}, disconnected={disconnected}") if not disconnected: peers.append(nick) except ValueError as e: logger.warning(f"Failed to parse peerlist entry '{entry}': {e}") continue logger.info(f"Received {len(peers)} active peers from {self.host}:{self.port}") return peers async def get_peerlist_with_features(self) -> list[tuple[str, str, FeatureSet]]: """ Fetch the current list of connected peers with their features. Uses the standard GETPEERLIST message. If the directory supports peerlist_features, the response will include F: suffix with features. Note: Reference implementation directories do NOT support GETPEERLIST. This method tracks whether the directory supports it and skips requests to unsupported directories to avoid spamming warnings in their logs. Returns: List of (nick, location, features) tuples for active peers. Features will be empty for directories that don't support peerlist_features. Returns empty list if directory doesn't support GETPEERLIST. """ if not self.connection: raise DirectoryClientError("Not connected") # Skip if we already know this directory doesn't support GETPEERLIST if self._peerlist_supported is False: logger.debug("Skipping GETPEERLIST - directory doesn't support it") return [] # Rate-limit peerlist requests to avoid spamming import time current_time = time.time() if current_time - self._last_peerlist_request_time < self._peerlist_min_interval: logger.debug( f"Skipping GETPEERLIST - rate limited " f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)" ) return [] self._last_peerlist_request_time = current_time getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""} logger.debug("Sending GETPEERLIST request") await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8")) start_time = asyncio.get_event_loop().time() response = None while True: elapsed = asyncio.get_event_loop().time() - start_time if elapsed > self.timeout: # Timeout without PEERLIST response - directory likely doesn't support it logger.info( f"Timed out waiting for PEERLIST from {self.host}:{self.port} - " "directory likely doesn't support GETPEERLIST (reference implementation)" ) self._peerlist_supported = False return [] try: response_data = await asyncio.wait_for( self.connection.receive(), timeout=self.timeout - elapsed ) response = json.loads(response_data.decode("utf-8")) msg_type = response.get("type") logger.debug(f"Received response type: {msg_type}") if msg_type == MessageType.PEERLIST.value: break logger.debug( f"Skipping unexpected message type {msg_type} while waiting for PEERLIST" ) except TimeoutError: # Timeout without PEERLIST response - directory likely doesn't support it logger.info( f"Timed out waiting for PEERLIST from {self.host}:{self.port} - " "directory likely doesn't support GETPEERLIST (reference implementation)" ) self._peerlist_supported = False return [] except Exception as e: logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}") if asyncio.get_event_loop().time() - start_time > self.timeout: self._peerlist_supported = False return [] peerlist_str = response["line"] logger.debug(f"Peerlist string: {peerlist_str}") # Mark peerlist as supported since we got a valid response self._peerlist_supported = True if not peerlist_str: return [] peers: list[tuple[str, str, FeatureSet]] = [] for entry in peerlist_str.split(","): # Skip empty entries if not entry or not entry.strip(): continue # Skip entries without separator - these are metadata (e.g., 'peerlist_features') # from the reference implementation, not actual peer entries if NICK_PEERLOCATOR_SEPARATOR not in entry: logger.debug(f"Skipping metadata entry in peerlist: '{entry}'") continue try: nick, location, disconnected, features = parse_peerlist_entry(entry) logger.debug( f"Parsed peer: {nick} at {location}, " f"disconnected={disconnected}, features={features.to_comma_string()}" ) if not disconnected: peers.append((nick, location, features)) # Always update peer_features cache to track that we've seen this peer # This prevents triggering "new peer" logic for every message from this peer self.peer_features[nick] = features.to_dict() except ValueError as e: logger.warning(f"Failed to parse peerlist entry '{entry}': {e}") continue logger.info( f"Received {len(peers)} active peers with features from {self.host}:{self.port}" ) return peers async def listen_for_messages(self, duration: float = 5.0) -> list[dict[str, Any]]: """ Listen for messages for a specified duration. This method collects all messages received within the specified duration. It properly handles connection closed errors by raising DirectoryClientError. Args: duration: How long to listen in seconds Returns: List of received messages Raises: DirectoryClientError: If not connected or connection is lost """ if not self.connection: raise DirectoryClientError("Not connected") # Check connection state before starting if not self.connection.is_connected(): raise DirectoryClientError("Connection closed") messages: list[dict[str, Any]] = [] start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < duration: try: remaining_time = duration - (asyncio.get_event_loop().time() - start_time) if remaining_time <= 0: break response_data = await asyncio.wait_for( self.connection.receive(), timeout=remaining_time ) response = json.loads(response_data.decode("utf-8")) logger.trace( f"Received message type {response.get('type')}: " f"{response.get('line', '')[:80]}..." ) messages.append(response) except TimeoutError: # Normal timeout - no more messages within duration break except Exception as e: # Connection errors should propagate up so caller can reconnect error_msg = str(e).lower() if "connection" in error_msg and ("closed" in error_msg or "lost" in error_msg): raise DirectoryClientError(f"Connection lost: {e}") from e # Other errors (JSON parse, etc) - log and continue logger.warning(f"Error processing message: {e}") continue logger.trace(f"Collected {len(messages)} messages in {duration}s") return messages async def fetch_orderbooks(self) -> tuple[list[Offer], list[FidelityBond]]: """ Fetch orderbooks from all connected peers. Returns: Tuple of (offers, fidelity_bonds) """ # Use get_peerlist_with_features to populate peer_features cache peers_with_features = await self.get_peerlist_with_features() offers: list[Offer] = [] bonds: list[FidelityBond] = [] bond_utxo_set: set[str] = set() # Build set of active nicks for filtering stale offers # Use peerlist_with_features if available, otherwise fall back to basic peerlist active_nicks: set[str] = set() if peers_with_features: active_nicks = {nick for nick, _loc, _features in peers_with_features} logger.info(f"Found {len(peers_with_features)} peers on {self.host}:{self.port}") else: # Fallback for directories without peerlist_features support (reference impl) # or when all peers are NOT-SERVING-ONION (regtest/local) try: basic_peerlist = await self.get_peerlist() if basic_peerlist: active_nicks = set(basic_peerlist) logger.info( f"Found {len(basic_peerlist)} peers on {self.host}:{self.port} (basic peerlist)" ) else: logger.info( f"Peerlist empty on {self.host}:{self.port} (makers may be NOT-SERVING-ONION)" ) except DirectoryClientError as e: logger.warning(f"Failed to get basic peerlist: {e}") if not self.connection: raise DirectoryClientError("Not connected") pubmsg = { "type": MessageType.PUBMSG.value, "line": f"{self.nick}!PUBLIC!!orderbook", } await self.connection.send(json.dumps(pubmsg).encode("utf-8")) logger.debug("Sent !orderbook broadcast to PUBLIC") logger.info("Listening for offer announcements for 10 seconds...") messages = await self.listen_for_messages(duration=10.0) logger.info(f"Received {len(messages)} messages, parsing offers...") for response in messages: try: msg_type = response.get("type") if msg_type not in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value): logger.debug(f"Skipping message type {msg_type}") continue line = response["line"] logger.debug(f"Processing message type {msg_type}: {line[:100]}...") parts = line.split(COMMAND_PREFIX) if len(parts) < 3: logger.debug(f"Message has insufficient parts: {len(parts)}") continue from_nick = parts[0] to_nick = parts[1] rest = COMMAND_PREFIX.join(parts[2:]) if not rest.strip(): logger.debug("Empty message content") continue offer_types = ["sw0absoffer", "sw0reloffer", "swabsoffer", "swreloffer"] parsed = False for offer_type in offer_types: if rest.startswith(offer_type): try: # Split on '!' to extract flags (neutrino, tbond) # Format: sw0reloffer 0 750000 790107726787 500 0.001!neutrino!tbond <proof> # NOTE: !neutrino in offers is deprecated - primary detection is via # handshake features. This parsing is kept for backwards compatibility. rest_parts = rest.split(COMMAND_PREFIX) offer_line = rest_parts[0] bond_data = None neutrino_compat = False # Parse flags after the offer line (backwards compat for !neutrino) for flag_part in rest_parts[1:]: if flag_part.startswith("neutrino"): neutrino_compat = True logger.debug(f"Maker {from_nick} requires neutrino_compat") elif flag_part.startswith("tbond "): bond_parts = flag_part[6:].split() if bond_parts: bond_proof_b64 = bond_parts[0] # For PRIVMSG, the maker signs with taker's actual nick # For PUBMSG, both nicks are the maker's (self-signed) is_privmsg = msg_type == MessageType.PRIVMSG.value taker_nick_for_proof = to_nick if is_privmsg else from_nick bond_data = parse_fidelity_bond_proof( bond_proof_b64, from_nick, taker_nick_for_proof ) if bond_data: logger.debug( f"Parsed fidelity bond from {from_nick}: " f"txid={bond_data['utxo_txid'][:16]}..., " f"locktime={bond_data['locktime']}" ) utxo_str = ( f"{bond_data['utxo_txid']}:{bond_data['utxo_vout']}" ) if utxo_str not in bond_utxo_set: bond_utxo_set.add(utxo_str) bond = FidelityBond( counterparty=from_nick, utxo_txid=bond_data["utxo_txid"], utxo_vout=bond_data["utxo_vout"], locktime=bond_data["locktime"], script=bond_data["utxo_pub"], utxo_confirmations=0, cert_expiry=bond_data["cert_expiry"], fidelity_bond_data=bond_data, ) bonds.append(bond) offer_parts = offer_line.split() if len(offer_parts) < 6: logger.warning( f"Offer from {from_nick} has {len(offer_parts)} parts, need 6" ) continue oid = int(offer_parts[1]) minsize = int(offer_parts[2]) maxsize = int(offer_parts[3]) txfee = int(offer_parts[4]) cjfee_str = offer_parts[5] if offer_type in ["sw0absoffer", "swabsoffer"]: cjfee = str(int(cjfee_str)) else: cjfee = str(Decimal(cjfee_str)) offer = Offer( counterparty=from_nick, oid=oid, ordertype=OfferType(offer_type), minsize=minsize, maxsize=maxsize, txfee=txfee, cjfee=cjfee, fidelity_bond_value=0, neutrino_compat=neutrino_compat, features=self.peer_features.get(from_nick, {}), ) offers.append(offer) if bond_data: offer.fidelity_bond_data = bond_data logger.info( f"Parsed {offer_type} from {from_nick}: " f"oid={oid}, size={minsize}-{maxsize}, fee={cjfee}, " f"has_bond={bond_data is not None}, neutrino_compat={neutrino_compat}" ) parsed = True except Exception as e: logger.warning(f"Failed to parse {offer_type} from {from_nick}: {e}") break if not parsed: logger.debug(f"Message not an offer: {rest[:50]}...") except Exception as e: logger.warning(f"Failed to process message: {e}") continue # Filter offers to only include makers that are still in the current peerlist. # This prevents selecting stale offers from makers that have disconnected. # This is especially important for flaky tests where makers may restart or # disconnect between orderbook fetch and CoinJoin execution. # # Note: If peerlist is empty, we skip filtering and trust the offers. This happens when: # 1. All peers use NOT-SERVING-ONION (regtest/local environments) # 2. Directory doesn't support GETPEERLIST (reference implementation) # # The directory server will still reject messages to disconnected peers, # so we're not at risk of sending messages to offline makers. if active_nicks: original_count = len(offers) offers = [o for o in offers if o.counterparty in active_nicks] filtered_count = original_count - len(offers) if filtered_count > 0: logger.warning( f"Filtered out {filtered_count} stale offers from disconnected makers" ) elif self._peerlist_supported is False: logger.debug( "Skipping offer filtering - directory doesn't support GETPEERLIST " "(reference implementation)" ) logger.info( f"Fetched {len(offers)} offers and {len(bonds)} fidelity bonds from " f"{self.host}:{self.port}" ) return offers, bonds async def send_public_message(self, message: str) -> None: """ Send a public message to all peers. Args: message: Message to broadcast """ if not self.connection: raise DirectoryClientError("Not connected") pubmsg = { "type": MessageType.PUBMSG.value, "line": f"{self.nick}!PUBLIC!{message}", } await self.connection.send(json.dumps(pubmsg).encode("utf-8")) async def send_private_message(self, recipient: str, command: str, data: str) -> None: """ Send a signed private message to a specific peer. JoinMarket requires all private messages to be signed with the sender's nick private key. The signature is appended to the message: Format: "!<command> <data> <pubkey_hex> <signature>" The message-to-sign is: data + hostid (to prevent replay attacks) Note: Only the data is signed, NOT the command prefix. Args: recipient: Target peer nick command: Command name (without ! prefix, e.g., 'fill', 'auth', 'tx') data: Command arguments to send (will be signed) """ if not self.connection: raise DirectoryClientError("Not connected") # Sign just the data (not the command) with our nick identity # Reference: rawmessage = ' '.join(message[1:].split(' ')[1:-2]) # This means they extract [1:-2] which is the args, not the command # So we sign: data + hostid signed_data = self.nick_identity.sign_message(data, self.hostid) # JoinMarket message format: from_nick!to_nick!command <args> # The COMMAND_PREFIX ("!") is used ONLY as a field separator between # from_nick, to_nick, and the message content. The command itself # does NOT have a "!" prefix. # Format: "<command> <signed_data>" where signed_data = "<data> <pubkey_hex> <sig_b64>" full_message = f"{command} {signed_data}" privmsg = { "type": MessageType.PRIVMSG.value, "line": f"{self.nick}!{recipient}!{full_message}", } await self.connection.send(json.dumps(privmsg).encode("utf-8")) async def close(self) -> None: """Close the connection to the directory server.""" if self.connection: try: # NOTE: We skip sending DISCONNECT (801) because the reference implementation # crashes on unhandled control messages. pass except Exception: pass finally: await self.connection.close() self.connection = None def stop(self) -> None: """Stop continuous listening.""" self.running = False async def listen_continuously(self, request_orderbook: bool = True) -> None: """ Continuously listen for messages and update internal offer/bond caches. This method runs indefinitely until stop() is called or connection is lost. Used by orderbook_watcher and maker to maintain live orderbook state. Args: request_orderbook: If True, send !orderbook request on startup to get current offers from makers. Set to False for maker bots that don't need to receive other offers. """ if not self.connection: raise DirectoryClientError("Not connected") logger.info(f"Starting continuous listening on {self.host}:{self.port}") self.running = True # Fetch peerlist with features to populate peer_features cache # This allows us to know which features each maker supports # Note: This may return empty if directory doesn't support GETPEERLIST (reference impl) try: await self.get_peerlist_with_features() if self._peerlist_supported: logger.info(f"Populated peer_features cache with {len(self.peer_features)} peers") else: logger.info( "Directory doesn't support GETPEERLIST - peer features will be " "learned from offer messages" ) except Exception as e: logger.warning(f"Failed to fetch peerlist with features: {e}") # Request current orderbook from makers if request_orderbook: try: pubmsg = { "type": MessageType.PUBMSG.value, "line": f"{self.nick}!PUBLIC!!orderbook", } await self.connection.send(json.dumps(pubmsg).encode("utf-8")) logger.info("Sent !orderbook request to get current offers") except Exception as e: logger.warning(f"Failed to send !orderbook request: {e}") # Track when we last sent an orderbook request (to avoid spamming) import time last_orderbook_request = time.time() orderbook_request_min_interval = 60.0 # Minimum 60 seconds between requests while self.running: try: # Read next message with timeout data = await asyncio.wait_for(self.connection.receive(), timeout=5.0) if not data: logger.warning(f"Connection to {self.host}:{self.port} closed") break message = json.loads(data.decode("utf-8")) msg_type = message.get("type") line = message.get("line", "") # Process PUBMSG and PRIVMSG to update offers/bonds cache # Reference implementation sends offer responses to !orderbook via PRIVMSG if msg_type in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value): try: parts = line.split(COMMAND_PREFIX) if len(parts) >= 3: from_nick = parts[0] to_nick = parts[1] rest = COMMAND_PREFIX.join(parts[2:]) # Accept PUBLIC broadcasts or messages addressed to us if to_nick == "PUBLIC" or to_nick == self.nick: # If we don't have features for this peer, it's a new peer. # We can try to refresh peerlist, but respect rate limits # and don't spam if directory doesn't support it. is_new_peer = from_nick not in self.peer_features current_time = time.time() if is_new_peer and self._peerlist_supported is not False: # Only refresh peerlist if we haven't recently # (get_peerlist_with_features has its own rate limiting) try: await self.get_peerlist_with_features() if self._peerlist_supported: logger.debug( f"Refreshed peerlist (new peer: {from_nick}), " f"now tracking {len(self.peer_features)} peers" ) except Exception as e: logger.debug(f"Failed to refresh peerlist: {e}") # Request orderbook from new peer (rate-limited) if ( request_orderbook and current_time - last_orderbook_request > orderbook_request_min_interval ): try: pubmsg = { "type": MessageType.PUBMSG.value, "line": f"{self.nick}!PUBLIC!!orderbook", } await self.connection.send( json.dumps(pubmsg).encode("utf-8") ) last_orderbook_request = current_time logger.info( f"Sent !orderbook request for new peer {from_nick}" ) except Exception as e: logger.debug(f"Failed to send !orderbook: {e}") # Parse offer announcements for offer_type_prefix in [ "sw0reloffer", "sw0absoffer", "swreloffer", "swabsoffer", ]: if rest.startswith(offer_type_prefix): # Separate offer from fidelity bond data rest_parts = rest.split(COMMAND_PREFIX, 1) offer_line = rest_parts[0].strip() # Parse fidelity bond if present bond_data = None if len(rest_parts) > 1 and rest_parts[1].startswith( "tbond " ): bond_parts = rest_parts[1][6:].split() if bond_parts: bond_proof_b64 = bond_parts[0] # For PUBLIC announcements, maker uses their own nick # as taker_nick when creating the proof. # For PRIVMSG (response to !orderbook), maker signs # for the recipient (us). taker_nick_for_proof = ( from_nick if to_nick == "PUBLIC" else to_nick ) bond_data = parse_fidelity_bond_proof( bond_proof_b64, from_nick, taker_nick_for_proof ) if bond_data: logger.debug( f"Parsed fidelity bond from {from_nick}: " f"txid={bond_data['utxo_txid'][:16]}..., " f"locktime={bond_data['locktime']}" ) # Store bond in bonds cache utxo_str = ( f"{bond_data['utxo_txid']}:" f"{bond_data['utxo_vout']}" ) bond = FidelityBond( counterparty=from_nick, utxo_txid=bond_data["utxo_txid"], utxo_vout=bond_data["utxo_vout"], locktime=bond_data["locktime"], script=bond_data["utxo_pub"], utxo_confirmations=0, cert_expiry=bond_data["cert_expiry"], fidelity_bond_data=bond_data, ) self.bonds[utxo_str] = bond offer_parts = offer_line.split() if len(offer_parts) >= 6: try: oid = int(offer_parts[1]) minsize = int(offer_parts[2]) maxsize = int(offer_parts[3]) txfee = int(offer_parts[4]) cjfee_str = offer_parts[5] if offer_type_prefix in [ "sw0absoffer", "swabsoffer", ]: cjfee = str(int(cjfee_str)) else: cjfee = str(Decimal(cjfee_str)) offer = Offer( counterparty=from_nick, oid=oid, ordertype=OfferType(offer_type_prefix), minsize=minsize, maxsize=maxsize, txfee=txfee, cjfee=cjfee, fidelity_bond_value=0, fidelity_bond_data=bond_data, features=self.peer_features.get(from_nick, {}), ) # Update cache using tuple key offer_key = (from_nick, oid) self.offers[offer_key] = offer # Track this peer as "known" even if peerlist didn't # return features. This prevents re-triggering new peer # logic for every message from this peer. if from_nick not in self.peer_features: self.peer_features[from_nick] = {} logger.debug( f"Updated offer cache: {from_nick} " f"{offer_type_prefix} oid={oid}" + (" (with bond)" if bond_data else "") ) except Exception as e: logger.debug(f"Failed to parse offer update: {e}") break except Exception as e: logger.debug(f"Failed to process PUBMSG: {e}") except TimeoutError: continue except asyncio.CancelledError: logger.info(f"Continuous listening on {self.host}:{self.port} cancelled") break except Exception as e: logger.error(f"Error in continuous listening: {e}") if self.on_disconnect: self.on_disconnect() break self.running = False logger.info(f"Stopped continuous listening on {self.host}:{self.port}") def get_current_offers(self) -> list[Offer]: """Get the current list of cached offers.""" return list(self.offers.values()) def get_current_bonds(self) -> list[FidelityBond]: """Get the current list of cached fidelity bonds.""" return list(self.bonds.values()) def supports_extended_utxo_format(self) -> bool: """ Check if we should use extended UTXO format with this directory. Extended format (txid:vout:scriptpubkey:blockheight) is used when both sides advertise neutrino_compat feature. Protocol version is not checked - features are negotiated independently. Returns: True if extended UTXO format should be used """ return self.neutrino_compat and self.directory_neutrino_compat def get_negotiated_version(self) -> int: """ Get the negotiated protocol version. Returns: Negotiated version (always 5 with feature-based approach) """ return self.negotiated_version if self.negotiated_version is not None else JM_VERSIONClient for connecting to JoinMarket directory servers.
Supports: - Direct TCP connections (for local/dev) - Tor connections (for .onion addresses) - Handshake protocol - Peerlist fetching - Orderbook fetching - Continuous listening for updates
Initialize DirectoryClient.
Args
host- Directory server hostname or .onion address
port- Directory server port
network- Bitcoin network (mainnet, testnet, signet, regtest)
nick_identity- NickIdentity for message signing (generated if None)
location- Our location string (onion address or NOT-SERVING-ONION)
socks_host- SOCKS proxy host for Tor
socks_port- SOCKS proxy port for Tor
timeout- Connection timeout in seconds
max_message_size- Maximum message size in bytes
on_disconnect- Callback when connection drops
neutrino_compat- Advertise support for Neutrino-compatible UTXO metadata
Methods
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close the connection to the directory server.""" if self.connection: try: # NOTE: We skip sending DISCONNECT (801) because the reference implementation # crashes on unhandled control messages. pass except Exception: pass finally: await self.connection.close() self.connection = NoneClose the connection to the directory server.
async def connect(self) ‑> None-
Expand source code
async def connect(self) -> None: """Connect to the directory server and perform handshake.""" try: logger.debug(f"DirectoryClient.connect: connecting to {self.host}:{self.port}") if not self.host.endswith(".onion"): self.connection = await connect_direct( self.host, self.port, self.max_message_size, self.timeout, ) logger.debug("DirectoryClient.connect: direct connection established") else: self.connection = await connect_via_tor( self.host, self.port, self.socks_host, self.socks_port, self.max_message_size, self.timeout, ) logger.debug("DirectoryClient.connect: tor connection established") logger.debug("DirectoryClient.connect: starting handshake") await self._handshake() logger.debug("DirectoryClient.connect: handshake complete") except Exception as e: logger.error(f"Failed to connect to {self.host}:{self.port}: {e}", exc_info=True) # Clean up connection if handshake failed if self.connection: with contextlib.suppress(Exception): await self.connection.close() self.connection = None raise DirectoryClientError(f"Connection failed: {e}") from eConnect to the directory server and perform handshake.
async def fetch_orderbooks(self) ‑> tuple[list[Offer], list[FidelityBond]]-
Expand source code
async def fetch_orderbooks(self) -> tuple[list[Offer], list[FidelityBond]]: """ Fetch orderbooks from all connected peers. Returns: Tuple of (offers, fidelity_bonds) """ # Use get_peerlist_with_features to populate peer_features cache peers_with_features = await self.get_peerlist_with_features() offers: list[Offer] = [] bonds: list[FidelityBond] = [] bond_utxo_set: set[str] = set() # Build set of active nicks for filtering stale offers # Use peerlist_with_features if available, otherwise fall back to basic peerlist active_nicks: set[str] = set() if peers_with_features: active_nicks = {nick for nick, _loc, _features in peers_with_features} logger.info(f"Found {len(peers_with_features)} peers on {self.host}:{self.port}") else: # Fallback for directories without peerlist_features support (reference impl) # or when all peers are NOT-SERVING-ONION (regtest/local) try: basic_peerlist = await self.get_peerlist() if basic_peerlist: active_nicks = set(basic_peerlist) logger.info( f"Found {len(basic_peerlist)} peers on {self.host}:{self.port} (basic peerlist)" ) else: logger.info( f"Peerlist empty on {self.host}:{self.port} (makers may be NOT-SERVING-ONION)" ) except DirectoryClientError as e: logger.warning(f"Failed to get basic peerlist: {e}") if not self.connection: raise DirectoryClientError("Not connected") pubmsg = { "type": MessageType.PUBMSG.value, "line": f"{self.nick}!PUBLIC!!orderbook", } await self.connection.send(json.dumps(pubmsg).encode("utf-8")) logger.debug("Sent !orderbook broadcast to PUBLIC") logger.info("Listening for offer announcements for 10 seconds...") messages = await self.listen_for_messages(duration=10.0) logger.info(f"Received {len(messages)} messages, parsing offers...") for response in messages: try: msg_type = response.get("type") if msg_type not in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value): logger.debug(f"Skipping message type {msg_type}") continue line = response["line"] logger.debug(f"Processing message type {msg_type}: {line[:100]}...") parts = line.split(COMMAND_PREFIX) if len(parts) < 3: logger.debug(f"Message has insufficient parts: {len(parts)}") continue from_nick = parts[0] to_nick = parts[1] rest = COMMAND_PREFIX.join(parts[2:]) if not rest.strip(): logger.debug("Empty message content") continue offer_types = ["sw0absoffer", "sw0reloffer", "swabsoffer", "swreloffer"] parsed = False for offer_type in offer_types: if rest.startswith(offer_type): try: # Split on '!' to extract flags (neutrino, tbond) # Format: sw0reloffer 0 750000 790107726787 500 0.001!neutrino!tbond <proof> # NOTE: !neutrino in offers is deprecated - primary detection is via # handshake features. This parsing is kept for backwards compatibility. rest_parts = rest.split(COMMAND_PREFIX) offer_line = rest_parts[0] bond_data = None neutrino_compat = False # Parse flags after the offer line (backwards compat for !neutrino) for flag_part in rest_parts[1:]: if flag_part.startswith("neutrino"): neutrino_compat = True logger.debug(f"Maker {from_nick} requires neutrino_compat") elif flag_part.startswith("tbond "): bond_parts = flag_part[6:].split() if bond_parts: bond_proof_b64 = bond_parts[0] # For PRIVMSG, the maker signs with taker's actual nick # For PUBMSG, both nicks are the maker's (self-signed) is_privmsg = msg_type == MessageType.PRIVMSG.value taker_nick_for_proof = to_nick if is_privmsg else from_nick bond_data = parse_fidelity_bond_proof( bond_proof_b64, from_nick, taker_nick_for_proof ) if bond_data: logger.debug( f"Parsed fidelity bond from {from_nick}: " f"txid={bond_data['utxo_txid'][:16]}..., " f"locktime={bond_data['locktime']}" ) utxo_str = ( f"{bond_data['utxo_txid']}:{bond_data['utxo_vout']}" ) if utxo_str not in bond_utxo_set: bond_utxo_set.add(utxo_str) bond = FidelityBond( counterparty=from_nick, utxo_txid=bond_data["utxo_txid"], utxo_vout=bond_data["utxo_vout"], locktime=bond_data["locktime"], script=bond_data["utxo_pub"], utxo_confirmations=0, cert_expiry=bond_data["cert_expiry"], fidelity_bond_data=bond_data, ) bonds.append(bond) offer_parts = offer_line.split() if len(offer_parts) < 6: logger.warning( f"Offer from {from_nick} has {len(offer_parts)} parts, need 6" ) continue oid = int(offer_parts[1]) minsize = int(offer_parts[2]) maxsize = int(offer_parts[3]) txfee = int(offer_parts[4]) cjfee_str = offer_parts[5] if offer_type in ["sw0absoffer", "swabsoffer"]: cjfee = str(int(cjfee_str)) else: cjfee = str(Decimal(cjfee_str)) offer = Offer( counterparty=from_nick, oid=oid, ordertype=OfferType(offer_type), minsize=minsize, maxsize=maxsize, txfee=txfee, cjfee=cjfee, fidelity_bond_value=0, neutrino_compat=neutrino_compat, features=self.peer_features.get(from_nick, {}), ) offers.append(offer) if bond_data: offer.fidelity_bond_data = bond_data logger.info( f"Parsed {offer_type} from {from_nick}: " f"oid={oid}, size={minsize}-{maxsize}, fee={cjfee}, " f"has_bond={bond_data is not None}, neutrino_compat={neutrino_compat}" ) parsed = True except Exception as e: logger.warning(f"Failed to parse {offer_type} from {from_nick}: {e}") break if not parsed: logger.debug(f"Message not an offer: {rest[:50]}...") except Exception as e: logger.warning(f"Failed to process message: {e}") continue # Filter offers to only include makers that are still in the current peerlist. # This prevents selecting stale offers from makers that have disconnected. # This is especially important for flaky tests where makers may restart or # disconnect between orderbook fetch and CoinJoin execution. # # Note: If peerlist is empty, we skip filtering and trust the offers. This happens when: # 1. All peers use NOT-SERVING-ONION (regtest/local environments) # 2. Directory doesn't support GETPEERLIST (reference implementation) # # The directory server will still reject messages to disconnected peers, # so we're not at risk of sending messages to offline makers. if active_nicks: original_count = len(offers) offers = [o for o in offers if o.counterparty in active_nicks] filtered_count = original_count - len(offers) if filtered_count > 0: logger.warning( f"Filtered out {filtered_count} stale offers from disconnected makers" ) elif self._peerlist_supported is False: logger.debug( "Skipping offer filtering - directory doesn't support GETPEERLIST " "(reference implementation)" ) logger.info( f"Fetched {len(offers)} offers and {len(bonds)} fidelity bonds from " f"{self.host}:{self.port}" ) return offers, bondsFetch orderbooks from all connected peers.
Returns
Tuple of (offers, fidelity_bonds)
def get_current_bonds(self) ‑> list[FidelityBond]-
Expand source code
def get_current_bonds(self) -> list[FidelityBond]: """Get the current list of cached fidelity bonds.""" return list(self.bonds.values())Get the current list of cached fidelity bonds.
def get_current_offers(self) ‑> list[Offer]-
Expand source code
def get_current_offers(self) -> list[Offer]: """Get the current list of cached offers.""" return list(self.offers.values())Get the current list of cached offers.
def get_negotiated_version(self) ‑> int-
Expand source code
def get_negotiated_version(self) -> int: """ Get the negotiated protocol version. Returns: Negotiated version (always 5 with feature-based approach) """ return self.negotiated_version if self.negotiated_version is not None else JM_VERSIONGet the negotiated protocol version.
Returns
Negotiated version (always 5 with feature-based approach)
async def get_peerlist(self) ‑> list[str]-
Expand source code
async def get_peerlist(self) -> list[str]: """ Fetch the current list of connected peers. Note: Reference implementation directories do NOT support GETPEERLIST. This method shares peerlist support tracking with get_peerlist_with_features(). Returns: List of active peer nicks. Returns empty list if directory doesn't support GETPEERLIST. """ if not self.connection: raise DirectoryClientError("Not connected") # Skip if we already know this directory doesn't support GETPEERLIST if self._peerlist_supported is False: logger.debug("Skipping GETPEERLIST - directory doesn't support it") return [] # Rate-limit peerlist requests to avoid spamming import time current_time = time.time() if current_time - self._last_peerlist_request_time < self._peerlist_min_interval: logger.debug( f"Skipping GETPEERLIST - rate limited " f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)" ) return [] self._last_peerlist_request_time = current_time getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""} logger.debug("Sending GETPEERLIST request") await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8")) start_time = asyncio.get_event_loop().time() response = None while True: elapsed = asyncio.get_event_loop().time() - start_time if elapsed > self.timeout: # Timeout without PEERLIST response - directory likely doesn't support it logger.info( f"Timed out waiting for PEERLIST from {self.host}:{self.port} - " "directory likely doesn't support GETPEERLIST (reference implementation)" ) self._peerlist_supported = False return [] try: response_data = await asyncio.wait_for( self.connection.receive(), timeout=self.timeout - elapsed ) response = json.loads(response_data.decode("utf-8")) msg_type = response.get("type") logger.debug(f"Received response type: {msg_type}") if msg_type == MessageType.PEERLIST.value: break logger.debug( f"Skipping unexpected message type {msg_type} while waiting for PEERLIST" ) except TimeoutError: # Timeout without PEERLIST response - directory likely doesn't support it logger.info( f"Timed out waiting for PEERLIST from {self.host}:{self.port} - " "directory likely doesn't support GETPEERLIST (reference implementation)" ) self._peerlist_supported = False return [] except Exception as e: logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}") if asyncio.get_event_loop().time() - start_time > self.timeout: self._peerlist_supported = False return [] peerlist_str = response["line"] logger.debug(f"Peerlist string: {peerlist_str}") # Mark peerlist as supported since we got a valid response self._peerlist_supported = True if not peerlist_str: return [] peers = [] for entry in peerlist_str.split(","): # Skip empty entries if not entry or not entry.strip(): continue # Skip entries without separator - these are metadata (e.g., 'peerlist_features') # from the reference implementation, not actual peer entries if NICK_PEERLOCATOR_SEPARATOR not in entry: logger.debug(f"Skipping metadata entry in peerlist: '{entry}'") continue try: nick, location, disconnected, _features = parse_peerlist_entry(entry) logger.debug(f"Parsed peer: {nick} at {location}, disconnected={disconnected}") if not disconnected: peers.append(nick) except ValueError as e: logger.warning(f"Failed to parse peerlist entry '{entry}': {e}") continue logger.info(f"Received {len(peers)} active peers from {self.host}:{self.port}") return peersFetch the current list of connected peers.
Note: Reference implementation directories do NOT support GETPEERLIST. This method shares peerlist support tracking with get_peerlist_with_features().
Returns
List of active peer nicks. Returns empty list if directory doesn't support GETPEERLIST.
async def get_peerlist_with_features(self) ‑> list[tuple[str, str, FeatureSet]]-
Expand source code
async def get_peerlist_with_features(self) -> list[tuple[str, str, FeatureSet]]: """ Fetch the current list of connected peers with their features. Uses the standard GETPEERLIST message. If the directory supports peerlist_features, the response will include F: suffix with features. Note: Reference implementation directories do NOT support GETPEERLIST. This method tracks whether the directory supports it and skips requests to unsupported directories to avoid spamming warnings in their logs. Returns: List of (nick, location, features) tuples for active peers. Features will be empty for directories that don't support peerlist_features. Returns empty list if directory doesn't support GETPEERLIST. """ if not self.connection: raise DirectoryClientError("Not connected") # Skip if we already know this directory doesn't support GETPEERLIST if self._peerlist_supported is False: logger.debug("Skipping GETPEERLIST - directory doesn't support it") return [] # Rate-limit peerlist requests to avoid spamming import time current_time = time.time() if current_time - self._last_peerlist_request_time < self._peerlist_min_interval: logger.debug( f"Skipping GETPEERLIST - rate limited " f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)" ) return [] self._last_peerlist_request_time = current_time getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""} logger.debug("Sending GETPEERLIST request") await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8")) start_time = asyncio.get_event_loop().time() response = None while True: elapsed = asyncio.get_event_loop().time() - start_time if elapsed > self.timeout: # Timeout without PEERLIST response - directory likely doesn't support it logger.info( f"Timed out waiting for PEERLIST from {self.host}:{self.port} - " "directory likely doesn't support GETPEERLIST (reference implementation)" ) self._peerlist_supported = False return [] try: response_data = await asyncio.wait_for( self.connection.receive(), timeout=self.timeout - elapsed ) response = json.loads(response_data.decode("utf-8")) msg_type = response.get("type") logger.debug(f"Received response type: {msg_type}") if msg_type == MessageType.PEERLIST.value: break logger.debug( f"Skipping unexpected message type {msg_type} while waiting for PEERLIST" ) except TimeoutError: # Timeout without PEERLIST response - directory likely doesn't support it logger.info( f"Timed out waiting for PEERLIST from {self.host}:{self.port} - " "directory likely doesn't support GETPEERLIST (reference implementation)" ) self._peerlist_supported = False return [] except Exception as e: logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}") if asyncio.get_event_loop().time() - start_time > self.timeout: self._peerlist_supported = False return [] peerlist_str = response["line"] logger.debug(f"Peerlist string: {peerlist_str}") # Mark peerlist as supported since we got a valid response self._peerlist_supported = True if not peerlist_str: return [] peers: list[tuple[str, str, FeatureSet]] = [] for entry in peerlist_str.split(","): # Skip empty entries if not entry or not entry.strip(): continue # Skip entries without separator - these are metadata (e.g., 'peerlist_features') # from the reference implementation, not actual peer entries if NICK_PEERLOCATOR_SEPARATOR not in entry: logger.debug(f"Skipping metadata entry in peerlist: '{entry}'") continue try: nick, location, disconnected, features = parse_peerlist_entry(entry) logger.debug( f"Parsed peer: {nick} at {location}, " f"disconnected={disconnected}, features={features.to_comma_string()}" ) if not disconnected: peers.append((nick, location, features)) # Always update peer_features cache to track that we've seen this peer # This prevents triggering "new peer" logic for every message from this peer self.peer_features[nick] = features.to_dict() except ValueError as e: logger.warning(f"Failed to parse peerlist entry '{entry}': {e}") continue logger.info( f"Received {len(peers)} active peers with features from {self.host}:{self.port}" ) return peersFetch the current list of connected peers with their features.
Uses the standard GETPEERLIST message. If the directory supports peerlist_features, the response will include F: suffix with features.
Note: Reference implementation directories do NOT support GETPEERLIST. This method tracks whether the directory supports it and skips requests to unsupported directories to avoid spamming warnings in their logs.
Returns
List of (nick, location, features) tuples for active peers. Features will be empty for directories that don't support peerlist_features. Returns empty list if directory doesn't support GETPEERLIST.
async def listen_continuously(self, request_orderbook: bool = True) ‑> None-
Expand source code
async def listen_continuously(self, request_orderbook: bool = True) -> None: """ Continuously listen for messages and update internal offer/bond caches. This method runs indefinitely until stop() is called or connection is lost. Used by orderbook_watcher and maker to maintain live orderbook state. Args: request_orderbook: If True, send !orderbook request on startup to get current offers from makers. Set to False for maker bots that don't need to receive other offers. """ if not self.connection: raise DirectoryClientError("Not connected") logger.info(f"Starting continuous listening on {self.host}:{self.port}") self.running = True # Fetch peerlist with features to populate peer_features cache # This allows us to know which features each maker supports # Note: This may return empty if directory doesn't support GETPEERLIST (reference impl) try: await self.get_peerlist_with_features() if self._peerlist_supported: logger.info(f"Populated peer_features cache with {len(self.peer_features)} peers") else: logger.info( "Directory doesn't support GETPEERLIST - peer features will be " "learned from offer messages" ) except Exception as e: logger.warning(f"Failed to fetch peerlist with features: {e}") # Request current orderbook from makers if request_orderbook: try: pubmsg = { "type": MessageType.PUBMSG.value, "line": f"{self.nick}!PUBLIC!!orderbook", } await self.connection.send(json.dumps(pubmsg).encode("utf-8")) logger.info("Sent !orderbook request to get current offers") except Exception as e: logger.warning(f"Failed to send !orderbook request: {e}") # Track when we last sent an orderbook request (to avoid spamming) import time last_orderbook_request = time.time() orderbook_request_min_interval = 60.0 # Minimum 60 seconds between requests while self.running: try: # Read next message with timeout data = await asyncio.wait_for(self.connection.receive(), timeout=5.0) if not data: logger.warning(f"Connection to {self.host}:{self.port} closed") break message = json.loads(data.decode("utf-8")) msg_type = message.get("type") line = message.get("line", "") # Process PUBMSG and PRIVMSG to update offers/bonds cache # Reference implementation sends offer responses to !orderbook via PRIVMSG if msg_type in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value): try: parts = line.split(COMMAND_PREFIX) if len(parts) >= 3: from_nick = parts[0] to_nick = parts[1] rest = COMMAND_PREFIX.join(parts[2:]) # Accept PUBLIC broadcasts or messages addressed to us if to_nick == "PUBLIC" or to_nick == self.nick: # If we don't have features for this peer, it's a new peer. # We can try to refresh peerlist, but respect rate limits # and don't spam if directory doesn't support it. is_new_peer = from_nick not in self.peer_features current_time = time.time() if is_new_peer and self._peerlist_supported is not False: # Only refresh peerlist if we haven't recently # (get_peerlist_with_features has its own rate limiting) try: await self.get_peerlist_with_features() if self._peerlist_supported: logger.debug( f"Refreshed peerlist (new peer: {from_nick}), " f"now tracking {len(self.peer_features)} peers" ) except Exception as e: logger.debug(f"Failed to refresh peerlist: {e}") # Request orderbook from new peer (rate-limited) if ( request_orderbook and current_time - last_orderbook_request > orderbook_request_min_interval ): try: pubmsg = { "type": MessageType.PUBMSG.value, "line": f"{self.nick}!PUBLIC!!orderbook", } await self.connection.send( json.dumps(pubmsg).encode("utf-8") ) last_orderbook_request = current_time logger.info( f"Sent !orderbook request for new peer {from_nick}" ) except Exception as e: logger.debug(f"Failed to send !orderbook: {e}") # Parse offer announcements for offer_type_prefix in [ "sw0reloffer", "sw0absoffer", "swreloffer", "swabsoffer", ]: if rest.startswith(offer_type_prefix): # Separate offer from fidelity bond data rest_parts = rest.split(COMMAND_PREFIX, 1) offer_line = rest_parts[0].strip() # Parse fidelity bond if present bond_data = None if len(rest_parts) > 1 and rest_parts[1].startswith( "tbond " ): bond_parts = rest_parts[1][6:].split() if bond_parts: bond_proof_b64 = bond_parts[0] # For PUBLIC announcements, maker uses their own nick # as taker_nick when creating the proof. # For PRIVMSG (response to !orderbook), maker signs # for the recipient (us). taker_nick_for_proof = ( from_nick if to_nick == "PUBLIC" else to_nick ) bond_data = parse_fidelity_bond_proof( bond_proof_b64, from_nick, taker_nick_for_proof ) if bond_data: logger.debug( f"Parsed fidelity bond from {from_nick}: " f"txid={bond_data['utxo_txid'][:16]}..., " f"locktime={bond_data['locktime']}" ) # Store bond in bonds cache utxo_str = ( f"{bond_data['utxo_txid']}:" f"{bond_data['utxo_vout']}" ) bond = FidelityBond( counterparty=from_nick, utxo_txid=bond_data["utxo_txid"], utxo_vout=bond_data["utxo_vout"], locktime=bond_data["locktime"], script=bond_data["utxo_pub"], utxo_confirmations=0, cert_expiry=bond_data["cert_expiry"], fidelity_bond_data=bond_data, ) self.bonds[utxo_str] = bond offer_parts = offer_line.split() if len(offer_parts) >= 6: try: oid = int(offer_parts[1]) minsize = int(offer_parts[2]) maxsize = int(offer_parts[3]) txfee = int(offer_parts[4]) cjfee_str = offer_parts[5] if offer_type_prefix in [ "sw0absoffer", "swabsoffer", ]: cjfee = str(int(cjfee_str)) else: cjfee = str(Decimal(cjfee_str)) offer = Offer( counterparty=from_nick, oid=oid, ordertype=OfferType(offer_type_prefix), minsize=minsize, maxsize=maxsize, txfee=txfee, cjfee=cjfee, fidelity_bond_value=0, fidelity_bond_data=bond_data, features=self.peer_features.get(from_nick, {}), ) # Update cache using tuple key offer_key = (from_nick, oid) self.offers[offer_key] = offer # Track this peer as "known" even if peerlist didn't # return features. This prevents re-triggering new peer # logic for every message from this peer. if from_nick not in self.peer_features: self.peer_features[from_nick] = {} logger.debug( f"Updated offer cache: {from_nick} " f"{offer_type_prefix} oid={oid}" + (" (with bond)" if bond_data else "") ) except Exception as e: logger.debug(f"Failed to parse offer update: {e}") break except Exception as e: logger.debug(f"Failed to process PUBMSG: {e}") except TimeoutError: continue except asyncio.CancelledError: logger.info(f"Continuous listening on {self.host}:{self.port} cancelled") break except Exception as e: logger.error(f"Error in continuous listening: {e}") if self.on_disconnect: self.on_disconnect() break self.running = False logger.info(f"Stopped continuous listening on {self.host}:{self.port}")Continuously listen for messages and update internal offer/bond caches.
This method runs indefinitely until stop() is called or connection is lost. Used by orderbook_watcher and maker to maintain live orderbook state.
Args
request_orderbook- If True, send !orderbook request on startup to get current offers from makers. Set to False for maker bots that don't need to receive other offers.
async def listen_for_messages(self, duration: float = 5.0) ‑> list[dict[str, typing.Any]]-
Expand source code
async def listen_for_messages(self, duration: float = 5.0) -> list[dict[str, Any]]: """ Listen for messages for a specified duration. This method collects all messages received within the specified duration. It properly handles connection closed errors by raising DirectoryClientError. Args: duration: How long to listen in seconds Returns: List of received messages Raises: DirectoryClientError: If not connected or connection is lost """ if not self.connection: raise DirectoryClientError("Not connected") # Check connection state before starting if not self.connection.is_connected(): raise DirectoryClientError("Connection closed") messages: list[dict[str, Any]] = [] start_time = asyncio.get_event_loop().time() while asyncio.get_event_loop().time() - start_time < duration: try: remaining_time = duration - (asyncio.get_event_loop().time() - start_time) if remaining_time <= 0: break response_data = await asyncio.wait_for( self.connection.receive(), timeout=remaining_time ) response = json.loads(response_data.decode("utf-8")) logger.trace( f"Received message type {response.get('type')}: " f"{response.get('line', '')[:80]}..." ) messages.append(response) except TimeoutError: # Normal timeout - no more messages within duration break except Exception as e: # Connection errors should propagate up so caller can reconnect error_msg = str(e).lower() if "connection" in error_msg and ("closed" in error_msg or "lost" in error_msg): raise DirectoryClientError(f"Connection lost: {e}") from e # Other errors (JSON parse, etc) - log and continue logger.warning(f"Error processing message: {e}") continue logger.trace(f"Collected {len(messages)} messages in {duration}s") return messagesListen for messages for a specified duration.
This method collects all messages received within the specified duration. It properly handles connection closed errors by raising DirectoryClientError.
Args
duration- How long to listen in seconds
Returns
List of received messages
Raises
DirectoryClientError- If not connected or connection is lost
async def send_private_message(self, recipient: str, command: str, data: str) ‑> None-
Expand source code
async def send_private_message(self, recipient: str, command: str, data: str) -> None: """ Send a signed private message to a specific peer. JoinMarket requires all private messages to be signed with the sender's nick private key. The signature is appended to the message: Format: "!<command> <data> <pubkey_hex> <signature>" The message-to-sign is: data + hostid (to prevent replay attacks) Note: Only the data is signed, NOT the command prefix. Args: recipient: Target peer nick command: Command name (without ! prefix, e.g., 'fill', 'auth', 'tx') data: Command arguments to send (will be signed) """ if not self.connection: raise DirectoryClientError("Not connected") # Sign just the data (not the command) with our nick identity # Reference: rawmessage = ' '.join(message[1:].split(' ')[1:-2]) # This means they extract [1:-2] which is the args, not the command # So we sign: data + hostid signed_data = self.nick_identity.sign_message(data, self.hostid) # JoinMarket message format: from_nick!to_nick!command <args> # The COMMAND_PREFIX ("!") is used ONLY as a field separator between # from_nick, to_nick, and the message content. The command itself # does NOT have a "!" prefix. # Format: "<command> <signed_data>" where signed_data = "<data> <pubkey_hex> <sig_b64>" full_message = f"{command} {signed_data}" privmsg = { "type": MessageType.PRIVMSG.value, "line": f"{self.nick}!{recipient}!{full_message}", } await self.connection.send(json.dumps(privmsg).encode("utf-8"))Send a signed private message to a specific peer.
JoinMarket requires all private messages to be signed with the sender's nick private key. The signature is appended to the message: Format: "!
" The message-to-sign is: data + hostid (to prevent replay attacks) Note: Only the data is signed, NOT the command prefix.
Args
recipient- Target peer nick
command- Command name (without ! prefix, e.g., 'fill', 'auth', 'tx')
data- Command arguments to send (will be signed)
async def send_public_message(self, message: str) ‑> None-
Expand source code
async def send_public_message(self, message: str) -> None: """ Send a public message to all peers. Args: message: Message to broadcast """ if not self.connection: raise DirectoryClientError("Not connected") pubmsg = { "type": MessageType.PUBMSG.value, "line": f"{self.nick}!PUBLIC!{message}", } await self.connection.send(json.dumps(pubmsg).encode("utf-8"))Send a public message to all peers.
Args
message- Message to broadcast
def stop(self) ‑> None-
Expand source code
def stop(self) -> None: """Stop continuous listening.""" self.running = FalseStop continuous listening.
def supports_extended_utxo_format(self) ‑> bool-
Expand source code
def supports_extended_utxo_format(self) -> bool: """ Check if we should use extended UTXO format with this directory. Extended format (txid:vout:scriptpubkey:blockheight) is used when both sides advertise neutrino_compat feature. Protocol version is not checked - features are negotiated independently. Returns: True if extended UTXO format should be used """ return self.neutrino_compat and self.directory_neutrino_compatCheck if we should use extended UTXO format with this directory.
Extended format (txid:vout:scriptpubkey:blockheight) is used when both sides advertise neutrino_compat feature. Protocol version is not checked - features are negotiated independently.
Returns
True if extended UTXO format should be used
class DirectoryClientError (*args, **kwargs)-
Expand source code
class DirectoryClientError(Exception): """Error raised by DirectoryClient operations."""Error raised by DirectoryClient operations.
Ancestors
- builtins.Exception
- builtins.BaseException