Package jmcore

jmcore - Core library for JoinMarket components

Provides shared functionality for protocol, crypto, and networking.

Sub-modules

jmcore.bitcoin

Bitcoin utilities for JoinMarket …

jmcore.bond_calc

Fidelity bond value calculations.

jmcore.btc_script

Bitcoin script utilities for fidelity bonds …

jmcore.commitment_blacklist

PoDLE commitment blacklist for preventing commitment reuse …

jmcore.config

Base configuration classes for JoinMarket components …

jmcore.confirmation

User confirmation prompts for fund-moving operations.

jmcore.constants

Bitcoin and JoinMarket protocol constants …

jmcore.crypto

Cryptographic primitives for JoinMarket.

jmcore.directory_client

Shared DirectoryClient for connecting to JoinMarket directory nodes …

jmcore.encryption

End-to-end encryption wrapper using NaCl public-key authenticated encryption …

jmcore.mempool_api

Mempool.space API client for Bitcoin blockchain queries.

jmcore.models

Core data models using Pydantic for validation and serialization.

jmcore.network

Network primitives and connection management.

jmcore.paths

Shared path utilities for JoinMarket data directories …

jmcore.podle

Proof of Discrete Log Equivalence (PoDLE) for JoinMarket …

jmcore.protocol

JoinMarket protocol definitions, message types, and serialization …

jmcore.rate_limiter

Per-peer rate limiting using token bucket algorithm …

jmcore.tor_control

Tor control port client for creating ephemeral hidden services …

Functions

def add_commitment(commitment: str, persist: bool = True) ‑> bool
Expand source code
def add_commitment(commitment: str, persist: bool = True) -> bool:
    """
    Add a commitment to the global blacklist.

    Convenience function that uses the global blacklist.

    Args:
        commitment: The commitment hash (hex string)
        persist: If True, save to disk immediately

    Returns:
        True if the commitment was newly added, False if already present
    """
    return get_blacklist().add(commitment, persist=persist)

Add a commitment to the global blacklist.

Convenience function that uses the global blacklist.

Args

commitment
The commitment hash (hex string)
persist
If True, save to disk immediately

Returns

True if the commitment was newly added, False if already present

def address_to_scriptpubkey(address: str) ‑> bytes
Expand source code
def address_to_scriptpubkey(address: str) -> bytes:
    """
    Convert Bitcoin address to scriptPubKey.

    Supports:
    - P2WPKH (bc1q..., tb1q..., bcrt1q...)
    - P2WSH (bc1q... 62 chars)
    - P2TR (bc1p... taproot)
    - P2PKH (1..., m..., n...)
    - P2SH (3..., 2...)

    Args:
        address: Bitcoin address string

    Returns:
        scriptPubKey bytes
    """
    # Bech32 (SegWit) addresses
    if address.startswith(("bc1", "tb1", "bcrt1")):
        hrp_end = 4 if address.startswith("bcrt") else 2
        hrp = address[:hrp_end]

        bech32_decoded = bech32_lib.decode(hrp, address)
        if bech32_decoded[0] is None or bech32_decoded[1] is None:
            raise ValueError(f"Invalid bech32 address: {address}")

        witver = bech32_decoded[0]
        witprog = bytes(bech32_decoded[1])

        if witver == 0:
            if len(witprog) == 20:
                # P2WPKH: OP_0 <20-byte-pubkeyhash>
                return bytes([0x00, 0x14]) + witprog
            elif len(witprog) == 32:
                # P2WSH: OP_0 <32-byte-scripthash>
                return bytes([0x00, 0x20]) + witprog
        elif witver == 1 and len(witprog) == 32:
            # P2TR: OP_1 <32-byte-pubkey>
            return bytes([0x51, 0x20]) + witprog

        raise ValueError(f"Unsupported witness version: {witver}")

    # Base58 addresses (legacy)
    decoded = base58.b58decode_check(address)
    version = decoded[0]
    payload = decoded[1:]

    if version in (0x00, 0x6F):  # Mainnet/Testnet P2PKH
        # P2PKH: OP_DUP OP_HASH160 <20-byte-pubkeyhash> OP_EQUALVERIFY OP_CHECKSIG
        return bytes([0x76, 0xA9, 0x14]) + payload + bytes([0x88, 0xAC])
    elif version in (0x05, 0xC4):  # Mainnet/Testnet P2SH
        # P2SH: OP_HASH160 <20-byte-scripthash> OP_EQUAL
        return bytes([0xA9, 0x14]) + payload + bytes([0x87])

    raise ValueError(f"Unknown address version: {version}")

Convert Bitcoin address to scriptPubKey.

Supports: - P2WPKH (bc1q…, tb1q…, bcrt1q…) - P2WSH (bc1q… 62 chars) - P2TR (bc1p… taproot) - P2PKH (1…, m…, n…) - P2SH (3…, 2…)

Args

address
Bitcoin address string

Returns

scriptPubKey bytes

def check_and_add_commitment(commitment: str, persist: bool = True) ‑> bool
Expand source code
def check_and_add_commitment(commitment: str, persist: bool = True) -> bool:
    """
    Check if a commitment is allowed and add it to the blacklist.

    Convenience function that uses the global blacklist.
    This is the primary function to use during CoinJoin processing.

    Args:
        commitment: The commitment hash (hex string)
        persist: If True, save to disk immediately after adding

    Returns:
        True if the commitment is NEW (allowed), False if already blacklisted
    """
    return get_blacklist().check_and_add(commitment, persist=persist)

Check if a commitment is allowed and add it to the blacklist.

Convenience function that uses the global blacklist. This is the primary function to use during CoinJoin processing.

Args

commitment
The commitment hash (hex string)
persist
If True, save to disk immediately after adding

Returns

True if the commitment is NEW (allowed), False if already blacklisted

def check_commitment(commitment: str) ‑> bool
Expand source code
def check_commitment(commitment: str) -> bool:
    """
    Check if a commitment is allowed (not blacklisted).

    Convenience function that uses the global blacklist.

    Args:
        commitment: The commitment hash (hex string)

    Returns:
        True if the commitment is allowed, False if blacklisted
    """
    return not get_blacklist().is_blacklisted(commitment)

Check if a commitment is allowed (not blacklisted).

Convenience function that uses the global blacklist.

Args

commitment
The commitment hash (hex string)

Returns

True if the commitment is allowed, False if blacklisted

def create_p2wpkh_script_code(pubkey: bytes | str) ‑> bytes
Expand source code
def create_p2wpkh_script_code(pubkey: bytes | str) -> bytes:
    """
    Create scriptCode for P2WPKH signing (BIP143).

    For P2WPKH, the scriptCode is the P2PKH script:
    OP_DUP OP_HASH160 <20-byte-pubkeyhash> OP_EQUALVERIFY OP_CHECKSIG

    Args:
        pubkey: Public key bytes or hex

    Returns:
        25-byte scriptCode
    """
    if isinstance(pubkey, str):
        pubkey = bytes.fromhex(pubkey)

    pubkey_hash = hash160(pubkey)
    # OP_DUP OP_HASH160 PUSH20 <pkh> OP_EQUALVERIFY OP_CHECKSIG
    return b"\x76\xa9\x14" + pubkey_hash + b"\x88\xac"

Create scriptCode for P2WPKH signing (BIP143).

For P2WPKH, the scriptCode is the P2PKH script: OP_DUP OP_HASH160 <20-byte-pubkeyhash> OP_EQUALVERIFY OP_CHECKSIG

Args

pubkey
Public key bytes or hex

Returns

25-byte scriptCode

def decode_varint(data: bytes, offset: int = 0) ‑> tuple[int, int]
Expand source code
def decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]:
    """
    Decode Bitcoin varint from bytes.

    Args:
        data: Input bytes
        offset: Starting offset in data

    Returns:
        (value, new_offset) tuple
    """
    first = data[offset]
    if first < 0xFD:
        return first, offset + 1
    elif first == 0xFD:
        return struct.unpack("<H", data[offset + 1 : offset + 3])[0], offset + 3
    elif first == 0xFE:
        return struct.unpack("<I", data[offset + 1 : offset + 5])[0], offset + 5
    else:
        return struct.unpack("<Q", data[offset + 1 : offset + 9])[0], offset + 9

Decode Bitcoin varint from bytes.

Args

data
Input bytes
offset
Starting offset in data

Returns

(value, new_offset) tuple

def deserialize_revelation(revelation_str: str) ‑> dict[str, typing.Any] | None
Expand source code
def deserialize_revelation(revelation_str: str) -> dict[str, Any] | None:
    """
    Deserialize PoDLE revelation from wire format.

    Format: P|P2|sig|e|utxo (pipe-separated hex strings)
    """
    try:
        parts = revelation_str.split("|")
        if len(parts) != 5:
            logger.warning(f"Invalid revelation format: expected 5 parts, got {len(parts)}")
            return None

        return {
            "P": parts[0],
            "P2": parts[1],
            "sig": parts[2],
            "e": parts[3],
            "utxo": parts[4],
        }

    except Exception as e:
        logger.error(f"Failed to deserialize PoDLE revelation: {e}")
        return None

Deserialize PoDLE revelation from wire format.

Format: P|P2|sig|e|utxo (pipe-separated hex strings)

def encode_varint(n: int) ‑> bytes
Expand source code
def encode_varint(n: int) -> bytes:
    """
    Encode integer as Bitcoin varint.

    Args:
        n: Integer to encode

    Returns:
        Encoded bytes
    """
    if n < 0xFD:
        return bytes([n])
    elif n <= 0xFFFF:
        return bytes([0xFD]) + struct.pack("<H", n)
    elif n <= 0xFFFFFFFF:
        return bytes([0xFE]) + struct.pack("<I", n)
    else:
        return bytes([0xFF]) + struct.pack("<Q", n)

Encode integer as Bitcoin varint.

Args

n
Integer to encode

Returns

Encoded bytes

def format_utxo_list(utxos: list[UTXOMetadata],
extended: bool = False) ‑> str
Expand source code
def format_utxo_list(utxos: list[UTXOMetadata], extended: bool = False) -> str:
    """
    Format a list of UTXOs as comma-separated string.

    Args:
        utxos: List of UTXOMetadata objects
        extended: If True, use extended format with scriptpubkey:blockheight

    Returns:
        Comma-separated UTXO string
    """
    if extended:
        return ",".join(u.to_extended_str() for u in utxos)
    else:
        return ",".join(u.to_legacy_str() for u in utxos)

Format a list of UTXOs as comma-separated string.

Args

utxos
List of UTXOMetadata objects
extended
If True, use extended format with scriptpubkey:blockheight

Returns

Comma-separated UTXO string

def generate_podle(private_key_bytes: bytes, utxo_str: str, index: int = 0) ‑> PoDLECommitment
Expand source code
def generate_podle(
    private_key_bytes: bytes,
    utxo_str: str,
    index: int = 0,
) -> PoDLECommitment:
    """
    Generate a PoDLE commitment for a UTXO.

    The PoDLE proves that the taker owns the UTXO without revealing
    the private key. It creates a zero-knowledge proof that:
    P = k*G and P2 = k*J have the same discrete log k.

    Args:
        private_key_bytes: 32-byte private key
        utxo_str: UTXO reference as "txid:vout"
        index: NUMS point index (0-9)

    Returns:
        PoDLECommitment with all proof data
    """
    if len(private_key_bytes) != 32:
        raise PoDLEError(f"Invalid private key length: {len(private_key_bytes)}")

    if index not in PRECOMPUTED_NUMS:
        raise PoDLEError(f"Invalid NUMS index: {index}")

    # Get private key as integer
    k = int.from_bytes(private_key_bytes, "big")
    if k == 0 or k >= SECP256K1_N:
        raise PoDLEError("Invalid private key value")

    # Calculate P = k*G (standard public key)
    p_point = scalar_mult_g(k)
    p_bytes = point_to_bytes(p_point)

    # Get NUMS point J
    j_point = get_nums_point(index)

    # Calculate P2 = k*J
    p2_point = point_mult(k, j_point)
    p2_bytes = point_to_bytes(p2_point)

    # Generate commitment C = H(P2)
    commitment = hashlib.sha256(p2_bytes).digest()

    # Generate Schnorr-like proof
    # Choose random nonce k_proof
    k_proof = int.from_bytes(secrets.token_bytes(32), "big") % SECP256K1_N
    if k_proof == 0:
        k_proof = 1

    # Kg = k_proof * G
    kg_point = scalar_mult_g(k_proof)
    kg_bytes = point_to_bytes(kg_point)

    # Kj = k_proof * J
    kj_point = point_mult(k_proof, j_point)
    kj_bytes = point_to_bytes(kj_point)

    # Challenge e = H(Kg || Kj || P || P2)
    e_bytes = hashlib.sha256(kg_bytes + kj_bytes + p_bytes + p2_bytes).digest()
    e = int.from_bytes(e_bytes, "big") % SECP256K1_N

    # Response s = k_proof + e * k (mod n) - JAM compatible
    s = (k_proof + e * k) % SECP256K1_N
    s_bytes = s.to_bytes(32, "big")

    logger.debug(
        f"Generated PoDLE for {utxo_str} using NUMS index {index}, "
        f"commitment={commitment.hex()[:16]}..."
    )

    return PoDLECommitment(
        commitment=commitment,
        p=p_bytes,
        p2=p2_bytes,
        sig=s_bytes,
        e=e_bytes,
        utxo=utxo_str,
        index=index,
    )

Generate a PoDLE commitment for a UTXO.

The PoDLE proves that the taker owns the UTXO without revealing the private key. It creates a zero-knowledge proof that: P = kG and P2 = kJ have the same discrete log k.

Args

private_key_bytes
32-byte private key
utxo_str
UTXO reference as "txid:vout"
index
NUMS point index (0-9)

Returns

PoDLECommitment with all proof data

def get_blacklist(blacklist_path: Path | None = None, data_dir: Path | None = None) ‑> CommitmentBlacklist
Expand source code
def get_blacklist(
    blacklist_path: Path | None = None, data_dir: Path | None = None
) -> CommitmentBlacklist:
    """
    Get the global commitment blacklist instance.

    Args:
        blacklist_path: Path to the blacklist file. Only used on first call
                       to initialize the singleton.
        data_dir: Data directory for JoinMarket. Only used on first call
                 to initialize the singleton.

    Returns:
        The global CommitmentBlacklist instance
    """
    global _global_blacklist

    with _global_blacklist_lock:
        if _global_blacklist is None:
            _global_blacklist = CommitmentBlacklist(blacklist_path, data_dir)
        return _global_blacklist

Get the global commitment blacklist instance.

Args

blacklist_path
Path to the blacklist file. Only used on first call to initialize the singleton.
data_dir
Data directory for JoinMarket. Only used on first call to initialize the singleton.

Returns

The global CommitmentBlacklist instance

def get_commitment_blacklist_path(data_dir: Path | None = None) ‑> pathlib.Path
Expand source code
def get_commitment_blacklist_path(data_dir: Path | None = None) -> Path:
    """
    Get the path to the commitment blacklist file.

    Args:
        data_dir: Optional data directory (defaults to get_default_data_dir())

    Returns:
        Path to cmtdata/commitmentlist (compatible with reference JoinMarket)
    """
    if data_dir is None:
        data_dir = get_default_data_dir()

    # Use cmtdata/ subdirectory for commitment data (matches reference implementation)
    cmtdata_dir = data_dir / "cmtdata"
    cmtdata_dir.mkdir(parents=True, exist_ok=True)

    return cmtdata_dir / "commitmentlist"

Get the path to the commitment blacklist file.

Args

data_dir
Optional data directory (defaults to get_default_data_dir())

Returns

Path to cmtdata/commitmentlist (compatible with reference JoinMarket)

def get_default_data_dir() ‑> pathlib.Path
Expand source code
def get_default_data_dir() -> Path:
    """
    Get the default JoinMarket data directory.

    Returns ~/.joinmarket-ng or $JOINMARKET_DATA_DIR if set.
    Creates the directory if it doesn't exist.

    For compatibility with reference JoinMarket in Docker, users can
    set JOINMARKET_DATA_DIR=/home/jm/.joinmarket-ng to share the same volume.
    """
    env_path = os.getenv("JOINMARKET_DATA_DIR")
    data_dir = Path(env_path) if env_path else Path.home() / ".joinmarket-ng"

    data_dir.mkdir(parents=True, exist_ok=True)
    return data_dir

Get the default JoinMarket data directory.

Returns ~/.joinmarket-ng or $JOINMARKET_DATA_DIR if set. Creates the directory if it doesn't exist.

For compatibility with reference JoinMarket in Docker, users can set JOINMARKET_DATA_DIR=/home/jm/.joinmarket-ng to share the same volume.

def get_default_directory_nodes(network: NetworkType) ‑> list[str]
Expand source code
def get_default_directory_nodes(network: NetworkType) -> list[str]:
    """Get default directory nodes for a given network."""
    if network == NetworkType.MAINNET:
        return DIRECTORY_NODES_MAINNET.copy()
    elif network == NetworkType.SIGNET:
        return DIRECTORY_NODES_SIGNET.copy()
    elif network == NetworkType.TESTNET:
        return DIRECTORY_NODES_TESTNET.copy()
    # Regtest has no default directory nodes - must be configured
    return []

Get default directory nodes for a given network.

def get_hrp(network: str | NetworkType) ‑> str
Expand source code
def get_hrp(network: str | NetworkType) -> str:
    """
    Get bech32 human-readable part for network.

    Args:
        network: Network type (string or enum)

    Returns:
        HRP string (bc, tb, bcrt)
    """
    if isinstance(network, str):
        network = NetworkType(network)
    return HRP_MAP[network]

Get bech32 human-readable part for network.

Args

network
Network type (string or enum)

Returns

HRP string (bc, tb, bcrt)

def get_nick_version(nick: str) ‑> int
Expand source code
def get_nick_version(nick: str) -> int:
    """
    Extract protocol version from a JoinMarket nick.

    Nick format: J{version}{hash} where version is a single digit.
    Example: J5abc123... (v5)

    Returns JM_VERSION (5) if version cannot be determined.
    """
    if nick and len(nick) >= 2 and nick[0] == "J" and nick[1].isdigit():
        return int(nick[1])
    return JM_VERSION

Extract protocol version from a JoinMarket nick.

Nick format: J{version}{hash} where version is a single digit. Example: J5abc123… (v5)

Returns JM_VERSION (5) if version cannot be determined.

def get_txid(tx_hex: str) ‑> str
Expand source code
def get_txid(tx_hex: str) -> str:
    """
    Calculate transaction ID (double SHA256 of non-witness data).

    Args:
        tx_hex: Transaction hex

    Returns:
        Transaction ID as hex string
    """
    parsed = parse_transaction(tx_hex)

    # Serialize without witness for txid calculation
    data = serialize_transaction(
        version=parsed.version,
        inputs=parsed.inputs,
        outputs=parsed.outputs,
        locktime=parsed.locktime,
        witnesses=None,  # No witnesses for txid
    )

    return hash256(data)[::-1].hex()

Calculate transaction ID (double SHA256 of non-witness data).

Args

tx_hex
Transaction hex

Returns

Transaction ID as hex string

def hash160(data: bytes) ‑> bytes
Expand source code
def hash160(data: bytes) -> bytes:
    """
    RIPEMD160(SHA256(data)) - Used for Bitcoin addresses.

    Args:
        data: Input data to hash

    Returns:
        20-byte hash
    """
    return hashlib.new("ripemd160", hashlib.sha256(data).digest()).digest()

RIPEMD160(SHA256(data)) - Used for Bitcoin addresses.

Args

data
Input data to hash

Returns

20-byte hash

def hash256(data: bytes) ‑> bytes
Expand source code
def hash256(data: bytes) -> bytes:
    """
    SHA256(SHA256(data)) - Used for Bitcoin txids and block hashes.

    Args:
        data: Input data to hash

    Returns:
        32-byte hash
    """
    return hashlib.sha256(hashlib.sha256(data).digest()).digest()

SHA256(SHA256(data)) - Used for Bitcoin txids and block hashes.

Args

data
Input data to hash

Returns

32-byte hash

def parse_podle_revelation(revelation: dict[str, Any]) ‑> dict[str, typing.Any] | None
Expand source code
def parse_podle_revelation(revelation: dict[str, Any]) -> dict[str, Any] | None:
    """
    Parse and validate PoDLE revelation structure.

    Expected format from taker:
    {
        'P': <hex string>,
        'P2': <hex string>,
        'sig': <hex string>,
        'e': <hex string>,
        'utxo': <txid:vout or txid:vout:scriptpubkey:blockheight string>
    }

    Returns parsed structure with bytes, or None if invalid.
    Extended format includes scriptpubkey and blockheight for neutrino_compat feature.
    """
    try:
        required_fields = ["P", "P2", "sig", "e", "utxo"]
        for field in required_fields:
            if field not in revelation:
                logger.warning(f"Missing required field in PoDLE revelation: {field}")
                return None

        p_bytes = bytes.fromhex(revelation["P"])
        p2_bytes = bytes.fromhex(revelation["P2"])
        sig_bytes = bytes.fromhex(revelation["sig"])
        e_bytes = bytes.fromhex(revelation["e"])

        utxo_parts = revelation["utxo"].split(":")

        # Legacy format: txid:vout (2 parts)
        # Extended format: txid:vout:scriptpubkey:blockheight (4 parts)
        if len(utxo_parts) == 2:
            txid = utxo_parts[0]
            vout = int(utxo_parts[1])
            scriptpubkey = None
            blockheight = None
        elif len(utxo_parts) == 4:
            txid = utxo_parts[0]
            vout = int(utxo_parts[1])
            scriptpubkey = utxo_parts[2]
            blockheight = int(utxo_parts[3])
            logger.debug(f"Parsed extended UTXO format: {txid}:{vout} with metadata")
        else:
            logger.warning(f"Invalid UTXO format: {revelation['utxo']}")
            return None

        result: dict[str, Any] = {
            "P": p_bytes,
            "P2": p2_bytes,
            "sig": sig_bytes,
            "e": e_bytes,
            "txid": txid,
            "vout": vout,
        }

        # Add extended metadata if present
        if scriptpubkey is not None:
            result["scriptpubkey"] = scriptpubkey
        if blockheight is not None:
            result["blockheight"] = blockheight

        return result

    except Exception as e:
        logger.error(f"Failed to parse PoDLE revelation: {e}")
        return None

Parse and validate PoDLE revelation structure.

Expected format from taker: { 'P': , 'P2': , 'sig': , 'e': , 'utxo': }

Returns parsed structure with bytes, or None if invalid. Extended format includes scriptpubkey and blockheight for neutrino_compat feature.

def parse_transaction(tx_hex: str) ‑> ParsedTransaction
Expand source code
def parse_transaction(tx_hex: str) -> ParsedTransaction:
    """
    Parse a Bitcoin transaction from hex.

    Handles both SegWit and non-SegWit formats.

    Args:
        tx_hex: Transaction hex string

    Returns:
        ParsedTransaction object
    """
    tx_bytes = bytes.fromhex(tx_hex)
    offset = 0

    # Version
    version = struct.unpack("<I", tx_bytes[offset : offset + 4])[0]
    offset += 4

    # Check for SegWit marker
    marker = tx_bytes[offset]
    flag = tx_bytes[offset + 1]
    has_witness = marker == 0x00 and flag == 0x01
    if has_witness:
        offset += 2

    # Inputs
    input_count, offset = decode_varint(tx_bytes, offset)
    inputs = []
    for _ in range(input_count):
        txid = tx_bytes[offset : offset + 32][::-1].hex()
        offset += 32
        vout = struct.unpack("<I", tx_bytes[offset : offset + 4])[0]
        offset += 4
        script_len, offset = decode_varint(tx_bytes, offset)
        scriptsig = tx_bytes[offset : offset + script_len].hex()
        offset += script_len
        sequence = struct.unpack("<I", tx_bytes[offset : offset + 4])[0]
        offset += 4
        inputs.append({"txid": txid, "vout": vout, "scriptsig": scriptsig, "sequence": sequence})

    # Outputs
    output_count, offset = decode_varint(tx_bytes, offset)
    outputs = []
    for _ in range(output_count):
        value = struct.unpack("<Q", tx_bytes[offset : offset + 8])[0]
        offset += 8
        script_len, offset = decode_varint(tx_bytes, offset)
        scriptpubkey = tx_bytes[offset : offset + script_len].hex()
        offset += script_len
        outputs.append({"value": value, "scriptpubkey": scriptpubkey})

    # Witnesses
    witnesses: list[list[bytes]] = []
    if has_witness:
        for _ in range(input_count):
            wit_count, offset = decode_varint(tx_bytes, offset)
            wit_items = []
            for _ in range(wit_count):
                item_len, offset = decode_varint(tx_bytes, offset)
                wit_items.append(tx_bytes[offset : offset + item_len])
                offset += item_len
            witnesses.append(wit_items)

    # Locktime
    locktime = struct.unpack("<I", tx_bytes[offset : offset + 4])[0]

    return ParsedTransaction(
        version=version,
        inputs=inputs,
        outputs=outputs,
        witnesses=witnesses,
        locktime=locktime,
        has_witness=has_witness,
    )

Parse a Bitcoin transaction from hex.

Handles both SegWit and non-SegWit formats.

Args

tx_hex
Transaction hex string

Returns

ParsedTransaction object

def parse_utxo_list(utxo_list_str: str, require_metadata: bool = False) ‑> list[UTXOMetadata]
Expand source code
def parse_utxo_list(utxo_list_str: str, require_metadata: bool = False) -> list[UTXOMetadata]:
    """
    Parse a comma-separated list of UTXOs.

    Args:
        utxo_list_str: Comma-separated UTXOs (legacy or extended format)
        require_metadata: If True, raise error if any UTXO lacks Neutrino metadata

    Returns:
        List of UTXOMetadata objects
    """
    if not utxo_list_str:
        return []

    utxos = []
    for utxo_str in utxo_list_str.split(","):
        utxo = UTXOMetadata.from_str(utxo_str.strip())
        if require_metadata and not utxo.has_neutrino_metadata():
            raise ValueError(f"UTXO {utxo.to_legacy_str()} missing Neutrino metadata")
        utxos.append(utxo)
    return utxos

Parse a comma-separated list of UTXOs.

Args

utxo_list_str
Comma-separated UTXOs (legacy or extended format)
require_metadata
If True, raise error if any UTXO lacks Neutrino metadata

Returns

List of UTXOMetadata objects

def peer_supports_neutrino_compat(handshake_data: dict[str, Any]) ‑> bool
Expand source code
def peer_supports_neutrino_compat(handshake_data: dict[str, Any]) -> bool:
    """
    Check if a peer supports Neutrino-compatible UTXO metadata.

    Args:
        handshake_data: Handshake payload from peer

    Returns:
        True if peer advertises neutrino_compat feature
    """
    features = handshake_data.get("features", {})
    return features.get(FEATURE_NEUTRINO_COMPAT, False)

Check if a peer supports Neutrino-compatible UTXO metadata.

Args

handshake_data
Handshake payload from peer

Returns

True if peer advertises neutrino_compat feature

def pubkey_to_p2wpkh_address(pubkey: bytes | str, network: str | NetworkType = 'mainnet') ‑> str
Expand source code
@validate_call
def pubkey_to_p2wpkh_address(pubkey: bytes | str, network: str | NetworkType = "mainnet") -> str:
    """
    Convert compressed public key to P2WPKH (native SegWit) address.

    Args:
        pubkey: 33-byte compressed public key (bytes or hex string)
        network: Network type

    Returns:
        Bech32 P2WPKH address
    """
    if isinstance(pubkey, str):
        pubkey = bytes.fromhex(pubkey)

    if len(pubkey) != 33:
        raise ValueError(f"Invalid compressed pubkey length: {len(pubkey)}")

    pubkey_hash = hash160(pubkey)
    hrp = get_hrp(network)

    result = bech32_lib.encode(hrp, 0, pubkey_hash)
    if result is None:
        raise ValueError("Failed to encode bech32 address")
    return result

Convert compressed public key to P2WPKH (native SegWit) address.

Args

pubkey
33-byte compressed public key (bytes or hex string)
network
Network type

Returns

Bech32 P2WPKH address

def pubkey_to_p2wpkh_script(pubkey: bytes | str) ‑> bytes
Expand source code
def pubkey_to_p2wpkh_script(pubkey: bytes | str) -> bytes:
    """
    Create P2WPKH scriptPubKey from public key.

    Args:
        pubkey: 33-byte compressed public key (bytes or hex string)

    Returns:
        22-byte P2WPKH scriptPubKey (OP_0 <20-byte-hash>)
    """
    if isinstance(pubkey, str):
        pubkey = bytes.fromhex(pubkey)

    pubkey_hash = hash160(pubkey)
    return bytes([0x00, 0x14]) + pubkey_hash

Create P2WPKH scriptPubKey from public key.

Args

pubkey
33-byte compressed public key (bytes or hex string)

Returns

22-byte P2WPKH scriptPubKey (OP_0 <20-byte-hash>)

def script_to_p2wsh_address(script: bytes, network: str | NetworkType = 'mainnet') ‑> str
Expand source code
@validate_call
def script_to_p2wsh_address(script: bytes, network: str | NetworkType = "mainnet") -> str:
    """
    Convert witness script to P2WSH address.

    Args:
        script: Witness script bytes
        network: Network type

    Returns:
        Bech32 P2WSH address
    """
    script_hash = sha256(script)
    hrp = get_hrp(network)

    result = bech32_lib.encode(hrp, 0, script_hash)
    if result is None:
        raise ValueError("Failed to encode bech32 address")
    return result

Convert witness script to P2WSH address.

Args

script
Witness script bytes
network
Network type

Returns

Bech32 P2WSH address

def script_to_p2wsh_scriptpubkey(script: bytes) ‑> bytes
Expand source code
def script_to_p2wsh_scriptpubkey(script: bytes) -> bytes:
    """
    Create P2WSH scriptPubKey from witness script.

    Args:
        script: Witness script bytes

    Returns:
        34-byte P2WSH scriptPubKey (OP_0 <32-byte-hash>)
    """
    script_hash = sha256(script)
    return bytes([0x00, 0x20]) + script_hash

Create P2WSH scriptPubKey from witness script.

Args

script
Witness script bytes

Returns

34-byte P2WSH scriptPubKey (OP_0 <32-byte-hash>)

def scriptpubkey_to_address(scriptpubkey: bytes, network: str | NetworkType = 'mainnet') ‑> str
Expand source code
@validate_call
def scriptpubkey_to_address(scriptpubkey: bytes, network: str | NetworkType = "mainnet") -> str:
    """
    Convert scriptPubKey to address.

    Supports P2WPKH, P2WSH, P2TR, P2PKH, P2SH.

    Args:
        scriptpubkey: scriptPubKey bytes
        network: Network type

    Returns:
        Bitcoin address string
    """
    if isinstance(network, str):
        network = NetworkType(network)

    hrp = get_hrp(network)

    # P2WPKH
    if len(scriptpubkey) == 22 and scriptpubkey[0] == 0x00 and scriptpubkey[1] == 0x14:
        result = bech32_lib.encode(hrp, 0, scriptpubkey[2:])
        if result is None:
            raise ValueError(f"Failed to encode P2WPKH address: {scriptpubkey.hex()}")
        return result

    # P2WSH
    if len(scriptpubkey) == 34 and scriptpubkey[0] == 0x00 and scriptpubkey[1] == 0x20:
        result = bech32_lib.encode(hrp, 0, scriptpubkey[2:])
        if result is None:
            raise ValueError(f"Failed to encode P2WSH address: {scriptpubkey.hex()}")
        return result

    # P2TR
    if len(scriptpubkey) == 34 and scriptpubkey[0] == 0x51 and scriptpubkey[1] == 0x20:
        result = bech32_lib.encode(hrp, 1, scriptpubkey[2:])
        if result is None:
            raise ValueError(f"Failed to encode P2TR address: {scriptpubkey.hex()}")
        return result

    # P2PKH
    if (
        len(scriptpubkey) == 25
        and scriptpubkey[0] == 0x76
        and scriptpubkey[1] == 0xA9
        and scriptpubkey[2] == 0x14
        and scriptpubkey[23] == 0x88
        and scriptpubkey[24] == 0xAC
    ):
        payload = bytes([P2PKH_VERSION[network]]) + scriptpubkey[3:23]
        return base58.b58encode_check(payload).decode("ascii")

    # P2SH
    if (
        len(scriptpubkey) == 23
        and scriptpubkey[0] == 0xA9
        and scriptpubkey[1] == 0x14
        and scriptpubkey[22] == 0x87
    ):
        payload = bytes([P2SH_VERSION[network]]) + scriptpubkey[2:22]
        return base58.b58encode_check(payload).decode("ascii")

    raise ValueError(f"Unsupported scriptPubKey: {scriptpubkey.hex()}")

Convert scriptPubKey to address.

Supports P2WPKH, P2WSH, P2TR, P2PKH, P2SH.

Args

scriptpubkey
scriptPubKey bytes
network
Network type

Returns

Bitcoin address string

def serialize_outpoint(txid: str, vout: int) ‑> bytes
Expand source code
def serialize_outpoint(txid: str, vout: int) -> bytes:
    """
    Serialize outpoint (txid:vout).

    Args:
        txid: Transaction ID in RPC format (big-endian hex)
        vout: Output index

    Returns:
        36-byte outpoint (little-endian txid + 4-byte vout)
    """
    txid_bytes = bytes.fromhex(txid)[::-1]
    return txid_bytes + struct.pack("<I", vout)

Serialize outpoint (txid:vout).

Args

txid
Transaction ID in RPC format (big-endian hex)
vout
Output index

Returns

36-byte outpoint (little-endian txid + 4-byte vout)

def serialize_revelation(commitment: PoDLECommitment) ‑> str
Expand source code
def serialize_revelation(commitment: PoDLECommitment) -> str:
    """
    Serialize PoDLE revelation to wire format.

    Format: P|P2|sig|e|utxo (pipe-separated hex strings)
    """
    return "|".join(
        [
            commitment.p.hex(),
            commitment.p2.hex(),
            commitment.sig.hex(),
            commitment.e.hex(),
            commitment.utxo,
        ]
    )

Serialize PoDLE revelation to wire format.

Format: P|P2|sig|e|utxo (pipe-separated hex strings)

def serialize_transaction(version: int,
inputs: list[dict[str, Any]],
outputs: list[dict[str, Any]],
locktime: int,
witnesses: list[list[bytes]] | None = None) ‑> bytes
Expand source code
def serialize_transaction(
    version: int,
    inputs: list[dict[str, Any]],
    outputs: list[dict[str, Any]],
    locktime: int,
    witnesses: list[list[bytes]] | None = None,
) -> bytes:
    """
    Serialize a Bitcoin transaction.

    Args:
        version: Transaction version
        inputs: List of input dicts
        outputs: List of output dicts
        locktime: Transaction locktime
        witnesses: Optional list of witness stacks

    Returns:
        Serialized transaction bytes
    """
    has_witness = witnesses is not None and any(w for w in witnesses)

    result = struct.pack("<I", version)

    if has_witness:
        result += bytes([0x00, 0x01])  # SegWit marker and flag

    # Inputs
    result += encode_varint(len(inputs))
    for inp in inputs:
        result += bytes.fromhex(inp["txid"])[::-1]
        result += struct.pack("<I", inp["vout"])
        scriptsig = bytes.fromhex(inp.get("scriptsig", ""))
        result += encode_varint(len(scriptsig))
        result += scriptsig
        result += struct.pack("<I", inp.get("sequence", 0xFFFFFFFF))

    # Outputs
    result += encode_varint(len(outputs))
    for out in outputs:
        result += struct.pack("<Q", out["value"])
        scriptpubkey = bytes.fromhex(out["scriptpubkey"])
        result += encode_varint(len(scriptpubkey))
        result += scriptpubkey

    # Witnesses
    if has_witness and witnesses:
        for witness in witnesses:
            result += encode_varint(len(witness))
            for item in witness:
                result += encode_varint(len(item))
                result += item

    result += struct.pack("<I", locktime)
    return result

Serialize a Bitcoin transaction.

Args

version
Transaction version
inputs
List of input dicts
outputs
List of output dicts
locktime
Transaction locktime
witnesses
Optional list of witness stacks

Returns

Serialized transaction bytes

def set_blacklist_path(blacklist_path: Path | None = None, data_dir: Path | None = None) ‑> None
Expand source code
def set_blacklist_path(blacklist_path: Path | None = None, data_dir: Path | None = None) -> None:
    """
    Set the path for the global blacklist.

    Must be called before any blacklist operations. If the blacklist
    has already been initialized, this will reinitialize it with the new path.

    Args:
        blacklist_path: Explicit path to blacklist file
        data_dir: Data directory (used if blacklist_path is None)
    """
    global _global_blacklist

    with _global_blacklist_lock:
        _global_blacklist = CommitmentBlacklist(blacklist_path, data_dir)
        logger.info(f"Set blacklist path to {_global_blacklist.blacklist_path}")

Set the path for the global blacklist.

Must be called before any blacklist operations. If the blacklist has already been initialized, this will reinitialize it with the new path.

Args

blacklist_path
Explicit path to blacklist file
data_dir
Data directory (used if blacklist_path is None)
def sha256(data: bytes) ‑> bytes
Expand source code
def sha256(data: bytes) -> bytes:
    """
    Single SHA256 hash.

    Args:
        data: Input data to hash

    Returns:
        32-byte hash
    """
    return hashlib.sha256(data).digest()

Single SHA256 hash.

Args

data
Input data to hash

Returns

32-byte hash

def verify_podle(p: bytes,
p2: bytes,
sig: bytes,
e: bytes,
commitment: bytes,
index_range: range = range(0, 10)) ‑> tuple[bool, str]
Expand source code
def verify_podle(
    p: bytes,
    p2: bytes,
    sig: bytes,
    e: bytes,
    commitment: bytes,
    index_range: range = range(10),
) -> tuple[bool, str]:
    """
    Verify PoDLE proof.

    Verifies that P and P2 have the same discrete log (private key)
    without revealing the private key itself.

    Args:
        p: Public key bytes (33 bytes compressed)
        p2: Commitment public key bytes (33 bytes compressed)
        sig: Signature s value (32 bytes)
        e: Challenge e value (32 bytes)
        commitment: sha256(P2) commitment (32 bytes)
        index_range: Allowed NUMS indices to try

    Returns:
        (is_valid, error_message)
    """
    try:
        if len(p) != 33:
            return False, f"Invalid P length: {len(p)}, expected 33"
        if len(p2) != 33:
            return False, f"Invalid P2 length: {len(p2)}, expected 33"
        if len(sig) != 32:
            return False, f"Invalid sig length: {len(sig)}, expected 32"
        if len(e) != 32:
            return False, f"Invalid e length: {len(e)}, expected 32"
        if len(commitment) != 32:
            return False, f"Invalid commitment length: {len(commitment)}, expected 32"

        expected_commitment = hashlib.sha256(p2).digest()
        if commitment != expected_commitment:
            return False, "Commitment does not match H(P2)"

        p_point = PublicKey(p)
        p2_point = PublicKey(p2)

        s_int = int.from_bytes(sig, "big")
        e_int = int.from_bytes(e, "big")

        if s_int >= SECP256K1_N or e_int >= SECP256K1_N:
            return False, "Signature values out of range"

        # sg = s * G
        sg = scalar_mult_g(s_int) if s_int > 0 else None

        # Compute -e mod N for subtraction (JAM compatible: s = k + e*x, verify Kg = s*G - e*P)
        minus_e_int = (-e_int) % SECP256K1_N

        for index in index_range:
            try:
                j = get_nums_point(index)

                # Kg = s*G - e*P = s*G + (-e)*P (JAM compatible verification)
                minus_e_p = point_mult(minus_e_int, p_point)
                kg = point_add(sg, minus_e_p) if sg is not None else minus_e_p

                # Kj = s*J - e*P2 = s*J + (-e)*P2
                minus_e_p2 = point_mult(minus_e_int, p2_point)
                if s_int > 0:
                    sj = point_mult(s_int, j)
                    kj = point_add(sj, minus_e_p2)
                else:
                    kj = minus_e_p2

                kg_bytes = point_to_bytes(kg)
                kj_bytes = point_to_bytes(kj)

                e_check = hashlib.sha256(kg_bytes + kj_bytes + p + p2).digest()

                if e_check == e:
                    logger.debug(f"PoDLE verification successful at index {index}")
                    return True, ""

            except Exception as ex:
                logger.debug(f"PoDLE verification failed at index {index}: {ex}")
                continue

        return False, f"PoDLE verification failed for all indices in {index_range}"

    except Exception as ex:
        logger.error(f"PoDLE verification error: {ex}")
        return False, f"Verification error: {ex}"

Verify PoDLE proof.

Verifies that P and P2 have the same discrete log (private key) without revealing the private key itself.

Args

p
Public key bytes (33 bytes compressed)
p2
Commitment public key bytes (33 bytes compressed)
sig
Signature s value (32 bytes)
e
Challenge e value (32 bytes)
commitment
sha256(P2) commitment (32 bytes)
index_range
Allowed NUMS indices to try

Returns

(is_valid, error_message)

Classes

class BackendConfig (**data: Any)
Expand source code
class BackendConfig(BaseModel):
    """
    Configuration for Bitcoin backend connection.

    Supports different backend types:
    - full_node: Bitcoin Core RPC
    - neutrino: Light client using BIP 157/158
    """

    backend_type: str = Field(
        default="full_node",
        description="Backend type: 'full_node' or 'neutrino'",
    )
    backend_config: dict[str, Any] = Field(
        default_factory=dict,
        description="Backend-specific configuration (RPC credentials, neutrino peers, etc.)",
    )

    model_config = {"frozen": False}

Configuration for Bitcoin backend connection.

Supports different backend types: - full_node: Bitcoin Core RPC - neutrino: Light client using BIP 157/158

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var backend_config : dict[str, typing.Any]

The type of the None singleton.

var backend_type : str

The type of the None singleton.

var model_config

The type of the None singleton.

class CommitmentBlacklist (blacklist_path: Path | None = None, data_dir: Path | None = None)
Expand source code
class CommitmentBlacklist:
    """
    Thread-safe commitment blacklist with file persistence.

    The blacklist is stored as a simple text file with one commitment per line.
    This matches the reference implementation's format for compatibility.
    """

    def __init__(self, blacklist_path: Path | None = None, data_dir: Path | None = None):
        """
        Initialize the commitment blacklist.

        Args:
            blacklist_path: Path to the blacklist file. If None, uses data_dir.
            data_dir: Data directory for JoinMarket (defaults to get_default_data_dir()).
                     Only used if blacklist_path is None.
        """
        if blacklist_path is None:
            blacklist_path = get_commitment_blacklist_path(data_dir)
        self.blacklist_path = blacklist_path

        # In-memory cache of blacklisted commitments
        self._commitments: set[str] = set()
        self._lock = threading.Lock()

        # Load existing blacklist from disk
        self._load_from_disk()

    def _load_from_disk(self) -> None:
        """Load blacklist from disk into memory."""
        if not self.blacklist_path.exists():
            logger.debug(f"No existing blacklist at {self.blacklist_path}")
            return

        try:
            with open(self.blacklist_path, encoding="ascii") as f:
                for line in f:
                    commitment = line.strip()
                    if commitment:
                        self._commitments.add(commitment)
            logger.info(f"Loaded {len(self._commitments)} commitments from blacklist")
        except Exception as e:
            logger.error(f"Failed to load blacklist from {self.blacklist_path}: {e}")

    def _save_to_disk(self) -> None:
        """Save in-memory blacklist to disk."""
        try:
            # Ensure parent directory exists
            self.blacklist_path.parent.mkdir(parents=True, exist_ok=True)

            with open(self.blacklist_path, "w", encoding="ascii") as f:
                for commitment in sorted(self._commitments):
                    f.write(commitment + "\n")
                f.flush()
            logger.debug(f"Saved {len(self._commitments)} commitments to blacklist")
        except Exception as e:
            logger.error(f"Failed to save blacklist to {self.blacklist_path}: {e}")

    def is_blacklisted(self, commitment: str) -> bool:
        """
        Check if a commitment is blacklisted.

        Args:
            commitment: The commitment hash (hex string, typically 64 chars)

        Returns:
            True if the commitment is blacklisted, False otherwise
        """
        # Normalize commitment (strip whitespace, lowercase)
        commitment = commitment.strip().lower()

        with self._lock:
            return commitment in self._commitments

    def add(self, commitment: str, persist: bool = True) -> bool:
        """
        Add a commitment to the blacklist.

        Args:
            commitment: The commitment hash (hex string)
            persist: If True, save to disk immediately

        Returns:
            True if the commitment was newly added, False if already present
        """
        # Normalize commitment
        commitment = commitment.strip().lower()

        if not commitment:
            logger.warning("Attempted to add empty commitment to blacklist")
            return False

        with self._lock:
            if commitment in self._commitments:
                return False

            self._commitments.add(commitment)
            logger.debug(f"Added commitment to blacklist: {commitment[:16]}...")

            if persist:
                self._save_to_disk()

            return True

    def check_and_add(self, commitment: str, persist: bool = True) -> bool:
        """
        Check if a commitment is blacklisted, and if not, add it.

        This is the primary method for handling commitments during CoinJoin.
        It atomically checks and adds in a single operation.

        Args:
            commitment: The commitment hash (hex string)
            persist: If True, save to disk immediately after adding

        Returns:
            True if the commitment is NEW (allowed), False if already blacklisted
        """
        # Normalize commitment
        commitment = commitment.strip().lower()

        if not commitment:
            logger.warning("Attempted to check empty commitment")
            return False

        with self._lock:
            if commitment in self._commitments:
                logger.info(f"Commitment already blacklisted: {commitment[:16]}...")
                return False

            self._commitments.add(commitment)
            logger.debug(f"Added commitment to blacklist: {commitment[:16]}...")

            if persist:
                self._save_to_disk()

            return True

    def __len__(self) -> int:
        """Return the number of blacklisted commitments."""
        with self._lock:
            return len(self._commitments)

    def __contains__(self, commitment: str) -> bool:
        """Check if a commitment is blacklisted using 'in' operator."""
        return self.is_blacklisted(commitment)

Thread-safe commitment blacklist with file persistence.

The blacklist is stored as a simple text file with one commitment per line. This matches the reference implementation's format for compatibility.

Initialize the commitment blacklist.

Args

blacklist_path
Path to the blacklist file. If None, uses data_dir.
data_dir
Data directory for JoinMarket (defaults to get_default_data_dir()). Only used if blacklist_path is None.

Methods

def add(self, commitment: str, persist: bool = True) ‑> bool
Expand source code
def add(self, commitment: str, persist: bool = True) -> bool:
    """
    Add a commitment to the blacklist.

    Args:
        commitment: The commitment hash (hex string)
        persist: If True, save to disk immediately

    Returns:
        True if the commitment was newly added, False if already present
    """
    # Normalize commitment
    commitment = commitment.strip().lower()

    if not commitment:
        logger.warning("Attempted to add empty commitment to blacklist")
        return False

    with self._lock:
        if commitment in self._commitments:
            return False

        self._commitments.add(commitment)
        logger.debug(f"Added commitment to blacklist: {commitment[:16]}...")

        if persist:
            self._save_to_disk()

        return True

Add a commitment to the blacklist.

Args

commitment
The commitment hash (hex string)
persist
If True, save to disk immediately

Returns

True if the commitment was newly added, False if already present

def check_and_add(self, commitment: str, persist: bool = True) ‑> bool
Expand source code
def check_and_add(self, commitment: str, persist: bool = True) -> bool:
    """
    Check if a commitment is blacklisted, and if not, add it.

    This is the primary method for handling commitments during CoinJoin.
    It atomically checks and adds in a single operation.

    Args:
        commitment: The commitment hash (hex string)
        persist: If True, save to disk immediately after adding

    Returns:
        True if the commitment is NEW (allowed), False if already blacklisted
    """
    # Normalize commitment
    commitment = commitment.strip().lower()

    if not commitment:
        logger.warning("Attempted to check empty commitment")
        return False

    with self._lock:
        if commitment in self._commitments:
            logger.info(f"Commitment already blacklisted: {commitment[:16]}...")
            return False

        self._commitments.add(commitment)
        logger.debug(f"Added commitment to blacklist: {commitment[:16]}...")

        if persist:
            self._save_to_disk()

        return True

Check if a commitment is blacklisted, and if not, add it.

This is the primary method for handling commitments during CoinJoin. It atomically checks and adds in a single operation.

Args

commitment
The commitment hash (hex string)
persist
If True, save to disk immediately after adding

Returns

True if the commitment is NEW (allowed), False if already blacklisted

def is_blacklisted(self, commitment: str) ‑> bool
Expand source code
def is_blacklisted(self, commitment: str) -> bool:
    """
    Check if a commitment is blacklisted.

    Args:
        commitment: The commitment hash (hex string, typically 64 chars)

    Returns:
        True if the commitment is blacklisted, False otherwise
    """
    # Normalize commitment (strip whitespace, lowercase)
    commitment = commitment.strip().lower()

    with self._lock:
        return commitment in self._commitments

Check if a commitment is blacklisted.

Args

commitment
The commitment hash (hex string, typically 64 chars)

Returns

True if the commitment is blacklisted, False otherwise

class CryptoSession
Expand source code
class CryptoSession:
    """
    Manages encryption state for a coinjoin session with a taker.
    """

    def __init__(self) -> None:
        """Initialize a new crypto session with a fresh keypair."""
        self.keypair: SecretKey = init_keypair()
        self.box: Box | None = None
        self.counterparty_pubkey: str = ""

    def get_pubkey_hex(self) -> str:
        """Get our public key as hex string."""
        pk = get_pubkey(self.keypair, as_hex=True)
        assert isinstance(pk, str)
        return pk

    def setup_encryption(self, counterparty_pubkey_hex: str) -> None:
        """
        Set up encryption with a counterparty's public key.

        Args:
            counterparty_pubkey_hex: Counterparty's public key in hex.
        """
        try:
            counterparty_pk = init_pubkey(counterparty_pubkey_hex)
            self.box = create_encryption_box(self.keypair, counterparty_pk)
            self.counterparty_pubkey = counterparty_pubkey_hex
            logger.debug("Set up encryption box with counterparty")
        except NaclError as e:
            logger.error(f"Failed to set up encryption: {e}")
            raise

    def encrypt(self, message: str) -> str:
        """
        Encrypt a message for the counterparty.

        Args:
            message: Plaintext message.

        Returns:
            Base64-encoded encrypted message.
        """
        if self.box is None:
            raise NaclError("Encryption not set up - call setup_encryption first")
        return encrypt_encode(message, self.box)

    def decrypt(self, message: str) -> str:
        """
        Decrypt a message from the counterparty.

        Args:
            message: Base64-encoded encrypted message.

        Returns:
            Decrypted plaintext.
        """
        if self.box is None:
            raise NaclError("Encryption not set up - call setup_encryption first")
        decrypted = decode_decrypt(message, self.box)
        return decrypted.decode("utf-8")

    @property
    def is_encrypted(self) -> bool:
        """Check if encryption has been set up."""
        return self.box is not None

Manages encryption state for a coinjoin session with a taker.

Initialize a new crypto session with a fresh keypair.

Instance variables

prop is_encrypted : bool
Expand source code
@property
def is_encrypted(self) -> bool:
    """Check if encryption has been set up."""
    return self.box is not None

Check if encryption has been set up.

Methods

def decrypt(self, message: str) ‑> str
Expand source code
def decrypt(self, message: str) -> str:
    """
    Decrypt a message from the counterparty.

    Args:
        message: Base64-encoded encrypted message.

    Returns:
        Decrypted plaintext.
    """
    if self.box is None:
        raise NaclError("Encryption not set up - call setup_encryption first")
    decrypted = decode_decrypt(message, self.box)
    return decrypted.decode("utf-8")

Decrypt a message from the counterparty.

Args

message
Base64-encoded encrypted message.

Returns

Decrypted plaintext.

def encrypt(self, message: str) ‑> str
Expand source code
def encrypt(self, message: str) -> str:
    """
    Encrypt a message for the counterparty.

    Args:
        message: Plaintext message.

    Returns:
        Base64-encoded encrypted message.
    """
    if self.box is None:
        raise NaclError("Encryption not set up - call setup_encryption first")
    return encrypt_encode(message, self.box)

Encrypt a message for the counterparty.

Args

message
Plaintext message.

Returns

Base64-encoded encrypted message.

def get_pubkey_hex(self) ‑> str
Expand source code
def get_pubkey_hex(self) -> str:
    """Get our public key as hex string."""
    pk = get_pubkey(self.keypair, as_hex=True)
    assert isinstance(pk, str)
    return pk

Get our public key as hex string.

def setup_encryption(self, counterparty_pubkey_hex: str) ‑> None
Expand source code
def setup_encryption(self, counterparty_pubkey_hex: str) -> None:
    """
    Set up encryption with a counterparty's public key.

    Args:
        counterparty_pubkey_hex: Counterparty's public key in hex.
    """
    try:
        counterparty_pk = init_pubkey(counterparty_pubkey_hex)
        self.box = create_encryption_box(self.keypair, counterparty_pk)
        self.counterparty_pubkey = counterparty_pubkey_hex
        logger.debug("Set up encryption box with counterparty")
    except NaclError as e:
        logger.error(f"Failed to set up encryption: {e}")
        raise

Set up encryption with a counterparty's public key.

Args

counterparty_pubkey_hex
Counterparty's public key in hex.
class DirectoryClient (host: str,
port: int,
network: str,
nick_identity: NickIdentity | None = None,
location: str = 'NOT-SERVING-ONION',
socks_host: str = '127.0.0.1',
socks_port: int = 9050,
timeout: float = 30.0,
max_message_size: int = 2097152,
on_disconnect: Callable[[], None] | None = None,
neutrino_compat: bool = False)
Expand source code
class DirectoryClient:
    """
    Client for connecting to JoinMarket directory servers.

    Supports:
    - Direct TCP connections (for local/dev)
    - Tor connections (for .onion addresses)
    - Handshake protocol
    - Peerlist fetching
    - Orderbook fetching
    - Continuous listening for updates
    """

    def __init__(
        self,
        host: str,
        port: int,
        network: str,
        nick_identity: NickIdentity | None = None,
        location: str = "NOT-SERVING-ONION",
        socks_host: str = "127.0.0.1",
        socks_port: int = 9050,
        timeout: float = 30.0,
        max_message_size: int = 2097152,
        on_disconnect: Callable[[], None] | None = None,
        neutrino_compat: bool = False,
    ) -> None:
        """
        Initialize DirectoryClient.

        Args:
            host: Directory server hostname or .onion address
            port: Directory server port
            network: Bitcoin network (mainnet, testnet, signet, regtest)
            nick_identity: NickIdentity for message signing (generated if None)
            location: Our location string (onion address or NOT-SERVING-ONION)
            socks_host: SOCKS proxy host for Tor
            socks_port: SOCKS proxy port for Tor
            timeout: Connection timeout in seconds
            max_message_size: Maximum message size in bytes
            on_disconnect: Callback when connection drops
            neutrino_compat: Advertise support for Neutrino-compatible UTXO metadata
        """
        self.host = host
        self.port = port
        self.network = network
        self.location = location
        self.socks_host = socks_host
        self.socks_port = socks_port
        self.timeout = timeout
        self.max_message_size = max_message_size
        self.connection: TCPConnection | None = None
        self.nick_identity = nick_identity or NickIdentity(JM_VERSION)
        self.nick = self.nick_identity.nick
        # hostid is used for message signing to prevent replay attacks
        # For onion-based networks, this is always "onion-network"
        self.hostid = "onion-network"
        self.offers: dict[tuple[str, int], Offer] = {}
        self.bonds: dict[str, FidelityBond] = {}
        self.peer_features: dict[str, dict[str, bool]] = {}  # nick -> features dict
        self.running = False
        self.on_disconnect = on_disconnect
        self.initial_orderbook_received = False
        self.last_orderbook_request_time: float = 0.0
        self.last_offer_received_time: float | None = None
        self.neutrino_compat = neutrino_compat

        # Version negotiation state (set after handshake)
        self.negotiated_version: int | None = None
        self.directory_neutrino_compat: bool = False
        self.directory_peerlist_features: bool = False  # True if directory supports F: suffix

        # Timing intervals
        self.peerlist_check_interval = 1800.0
        self.orderbook_refresh_interval = 1800.0
        self.orderbook_retry_interval = 300.0
        self.zero_offer_retry_interval = 600.0

        # Peerlist support tracking
        # If the directory doesn't support getpeerlist (e.g., reference implementation),
        # we track this to avoid spamming unsupported requests
        self._peerlist_supported: bool | None = None  # None = unknown, True/False = known
        self._last_peerlist_request_time: float = 0.0
        self._peerlist_min_interval: float = 60.0  # Minimum seconds between peerlist requests

    async def connect(self) -> None:
        """Connect to the directory server and perform handshake."""
        try:
            logger.debug(f"DirectoryClient.connect: connecting to {self.host}:{self.port}")
            if not self.host.endswith(".onion"):
                self.connection = await connect_direct(
                    self.host,
                    self.port,
                    self.max_message_size,
                    self.timeout,
                )
                logger.debug("DirectoryClient.connect: direct connection established")
            else:
                self.connection = await connect_via_tor(
                    self.host,
                    self.port,
                    self.socks_host,
                    self.socks_port,
                    self.max_message_size,
                    self.timeout,
                )
                logger.debug("DirectoryClient.connect: tor connection established")
            logger.debug("DirectoryClient.connect: starting handshake")
            await self._handshake()
            logger.debug("DirectoryClient.connect: handshake complete")
        except Exception as e:
            logger.error(f"Failed to connect to {self.host}:{self.port}: {e}", exc_info=True)
            # Clean up connection if handshake failed
            if self.connection:
                with contextlib.suppress(Exception):
                    await self.connection.close()
                self.connection = None
            raise DirectoryClientError(f"Connection failed: {e}") from e

    async def _handshake(self) -> None:
        """
        Perform directory server handshake with feature negotiation.

        We use proto-ver=5 for reference implementation compatibility.
        Features like neutrino_compat are negotiated independently via
        the features dict in the handshake payload.
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        # Build our feature set - always include peerlist_features to indicate we support
        # the extended peerlist format with F: suffix for feature information
        our_features: set[str] = {FEATURE_PEERLIST_FEATURES}
        if self.neutrino_compat:
            our_features.add(FEATURE_NEUTRINO_COMPAT)
        feature_set = FeatureSet(features=our_features)

        # Send our handshake with current version and features
        handshake_data = create_handshake_request(
            nick=self.nick,
            location=self.location,
            network=self.network,
            directory=False,
            features=feature_set,
        )
        logger.debug(f"DirectoryClient._handshake: created handshake data: {handshake_data}")
        handshake_msg = {
            "type": MessageType.HANDSHAKE.value,
            "line": json.dumps(handshake_data),
        }
        logger.debug("DirectoryClient._handshake: sending handshake message")
        await self.connection.send(json.dumps(handshake_msg).encode("utf-8"))
        logger.debug("DirectoryClient._handshake: handshake sent, waiting for response")

        # Receive and parse directory's response
        response_data = await asyncio.wait_for(self.connection.receive(), timeout=self.timeout)
        logger.debug(f"DirectoryClient._handshake: received response: {response_data[:200]!r}")
        response = json.loads(response_data.decode("utf-8"))

        if response["type"] not in (MessageType.HANDSHAKE.value, MessageType.DN_HANDSHAKE.value):
            raise DirectoryClientError(f"Unexpected response type: {response['type']}")

        handshake_response = json.loads(response["line"])
        if not handshake_response.get("accepted", False):
            raise DirectoryClientError("Handshake rejected")

        # Extract directory's version range
        # Reference directories only send "proto-ver" (single value, typically 5)
        dir_ver_min = handshake_response.get("proto-ver-min")
        dir_ver_max = handshake_response.get("proto-ver-max")

        if dir_ver_min is None or dir_ver_max is None:
            # Reference directory: only sends single proto-ver
            dir_version = handshake_response.get("proto-ver", 5)
            dir_ver_min = dir_ver_max = dir_version

        # Verify compatibility with our version (we only support v5)
        if not (dir_ver_min <= JM_VERSION <= dir_ver_max):
            raise DirectoryClientError(
                f"No compatible protocol version: we support v{JM_VERSION}, "
                f"directory supports [{dir_ver_min}, {dir_ver_max}]"
            )

        # Use v5 (our only supported version)
        self.negotiated_version = JM_VERSION

        # Check if directory supports Neutrino-compatible metadata
        self.directory_neutrino_compat = peer_supports_neutrino_compat(handshake_response)

        # Check if directory supports peerlist_features (extended peerlist with F: suffix)
        dir_features = handshake_response.get("features", {})
        self.directory_peerlist_features = dir_features.get(FEATURE_PEERLIST_FEATURES, False)

        logger.info(
            f"Handshake successful with {self.host}:{self.port} (nick: {self.nick}, "
            f"negotiated_version: v{self.negotiated_version}, "
            f"neutrino_compat: {self.directory_neutrino_compat}, "
            f"peerlist_features: {self.directory_peerlist_features})"
        )

    async def get_peerlist(self) -> list[str]:
        """
        Fetch the current list of connected peers.

        Note: Reference implementation directories do NOT support GETPEERLIST.
        This method shares peerlist support tracking with get_peerlist_with_features().

        Returns:
            List of active peer nicks. Returns empty list if directory doesn't
            support GETPEERLIST.
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        # Skip if we already know this directory doesn't support GETPEERLIST
        if self._peerlist_supported is False:
            logger.debug("Skipping GETPEERLIST - directory doesn't support it")
            return []

        # Rate-limit peerlist requests to avoid spamming
        import time

        current_time = time.time()
        if current_time - self._last_peerlist_request_time < self._peerlist_min_interval:
            logger.debug(
                f"Skipping GETPEERLIST - rate limited "
                f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)"
            )
            return []

        self._last_peerlist_request_time = current_time

        getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""}
        logger.debug("Sending GETPEERLIST request")
        await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8"))

        start_time = asyncio.get_event_loop().time()
        response = None

        while True:
            elapsed = asyncio.get_event_loop().time() - start_time
            if elapsed > self.timeout:
                # Timeout without PEERLIST response - directory likely doesn't support it
                logger.info(
                    f"Timed out waiting for PEERLIST from {self.host}:{self.port} - "
                    "directory likely doesn't support GETPEERLIST (reference implementation)"
                )
                self._peerlist_supported = False
                return []

            try:
                response_data = await asyncio.wait_for(
                    self.connection.receive(), timeout=self.timeout - elapsed
                )
                response = json.loads(response_data.decode("utf-8"))
                msg_type = response.get("type")
                logger.debug(f"Received response type: {msg_type}")

                if msg_type == MessageType.PEERLIST.value:
                    break

                logger.debug(
                    f"Skipping unexpected message type {msg_type} while waiting for PEERLIST"
                )
            except TimeoutError:
                # Timeout without PEERLIST response - directory likely doesn't support it
                logger.info(
                    f"Timed out waiting for PEERLIST from {self.host}:{self.port} - "
                    "directory likely doesn't support GETPEERLIST (reference implementation)"
                )
                self._peerlist_supported = False
                return []
            except Exception as e:
                logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}")
                if asyncio.get_event_loop().time() - start_time > self.timeout:
                    self._peerlist_supported = False
                    return []

        peerlist_str = response["line"]
        logger.debug(f"Peerlist string: {peerlist_str}")

        # Mark peerlist as supported since we got a valid response
        self._peerlist_supported = True

        if not peerlist_str:
            return []

        peers = []
        for entry in peerlist_str.split(","):
            # Skip empty entries
            if not entry or not entry.strip():
                continue
            # Skip entries without separator - these are metadata (e.g., 'peerlist_features')
            # from the reference implementation, not actual peer entries
            if NICK_PEERLOCATOR_SEPARATOR not in entry:
                logger.debug(f"Skipping metadata entry in peerlist: '{entry}'")
                continue
            try:
                nick, location, disconnected, _features = parse_peerlist_entry(entry)
                logger.debug(f"Parsed peer: {nick} at {location}, disconnected={disconnected}")
                if not disconnected:
                    peers.append(nick)
            except ValueError as e:
                logger.warning(f"Failed to parse peerlist entry '{entry}': {e}")
                continue

        logger.info(f"Received {len(peers)} active peers from {self.host}:{self.port}")
        return peers

    async def get_peerlist_with_features(self) -> list[tuple[str, str, FeatureSet]]:
        """
        Fetch the current list of connected peers with their features.

        Uses the standard GETPEERLIST message. If the directory supports
        peerlist_features, the response will include F: suffix with features.

        Note: Reference implementation directories do NOT support GETPEERLIST.
        This method tracks whether the directory supports it and skips requests
        to unsupported directories to avoid spamming warnings in their logs.

        Returns:
            List of (nick, location, features) tuples for active peers.
            Features will be empty for directories that don't support peerlist_features.
            Returns empty list if directory doesn't support GETPEERLIST.
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        # Skip if we already know this directory doesn't support GETPEERLIST
        if self._peerlist_supported is False:
            logger.debug("Skipping GETPEERLIST - directory doesn't support it")
            return []

        # Rate-limit peerlist requests to avoid spamming
        import time

        current_time = time.time()
        if current_time - self._last_peerlist_request_time < self._peerlist_min_interval:
            logger.debug(
                f"Skipping GETPEERLIST - rate limited "
                f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)"
            )
            return []

        self._last_peerlist_request_time = current_time

        getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""}
        logger.debug("Sending GETPEERLIST request")
        await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8"))

        start_time = asyncio.get_event_loop().time()
        response = None

        while True:
            elapsed = asyncio.get_event_loop().time() - start_time
            if elapsed > self.timeout:
                # Timeout without PEERLIST response - directory likely doesn't support it
                logger.info(
                    f"Timed out waiting for PEERLIST from {self.host}:{self.port} - "
                    "directory likely doesn't support GETPEERLIST (reference implementation)"
                )
                self._peerlist_supported = False
                return []

            try:
                response_data = await asyncio.wait_for(
                    self.connection.receive(), timeout=self.timeout - elapsed
                )
                response = json.loads(response_data.decode("utf-8"))
                msg_type = response.get("type")
                logger.debug(f"Received response type: {msg_type}")

                if msg_type == MessageType.PEERLIST.value:
                    break

                logger.debug(
                    f"Skipping unexpected message type {msg_type} while waiting for PEERLIST"
                )
            except TimeoutError:
                # Timeout without PEERLIST response - directory likely doesn't support it
                logger.info(
                    f"Timed out waiting for PEERLIST from {self.host}:{self.port} - "
                    "directory likely doesn't support GETPEERLIST (reference implementation)"
                )
                self._peerlist_supported = False
                return []
            except Exception as e:
                logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}")
                if asyncio.get_event_loop().time() - start_time > self.timeout:
                    self._peerlist_supported = False
                    return []

        peerlist_str = response["line"]
        logger.debug(f"Peerlist string: {peerlist_str}")

        # Mark peerlist as supported since we got a valid response
        self._peerlist_supported = True

        if not peerlist_str:
            return []

        peers: list[tuple[str, str, FeatureSet]] = []
        for entry in peerlist_str.split(","):
            # Skip empty entries
            if not entry or not entry.strip():
                continue
            # Skip entries without separator - these are metadata (e.g., 'peerlist_features')
            # from the reference implementation, not actual peer entries
            if NICK_PEERLOCATOR_SEPARATOR not in entry:
                logger.debug(f"Skipping metadata entry in peerlist: '{entry}'")
                continue
            try:
                nick, location, disconnected, features = parse_peerlist_entry(entry)
                logger.debug(
                    f"Parsed peer: {nick} at {location}, "
                    f"disconnected={disconnected}, features={features.to_comma_string()}"
                )
                if not disconnected:
                    peers.append((nick, location, features))
                    # Always update peer_features cache to track that we've seen this peer
                    # This prevents triggering "new peer" logic for every message from this peer
                    self.peer_features[nick] = features.to_dict()
            except ValueError as e:
                logger.warning(f"Failed to parse peerlist entry '{entry}': {e}")
                continue

        logger.info(
            f"Received {len(peers)} active peers with features from {self.host}:{self.port}"
        )
        return peers

    async def listen_for_messages(self, duration: float = 5.0) -> list[dict[str, Any]]:
        """
        Listen for messages for a specified duration.

        This method collects all messages received within the specified duration.
        It properly handles connection closed errors by raising DirectoryClientError.

        Args:
            duration: How long to listen in seconds

        Returns:
            List of received messages

        Raises:
            DirectoryClientError: If not connected or connection is lost
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        # Check connection state before starting
        if not self.connection.is_connected():
            raise DirectoryClientError("Connection closed")

        messages: list[dict[str, Any]] = []
        start_time = asyncio.get_event_loop().time()

        while asyncio.get_event_loop().time() - start_time < duration:
            try:
                remaining_time = duration - (asyncio.get_event_loop().time() - start_time)
                if remaining_time <= 0:
                    break

                response_data = await asyncio.wait_for(
                    self.connection.receive(), timeout=remaining_time
                )
                response = json.loads(response_data.decode("utf-8"))
                logger.trace(
                    f"Received message type {response.get('type')}: "
                    f"{response.get('line', '')[:80]}..."
                )
                messages.append(response)

            except TimeoutError:
                # Normal timeout - no more messages within duration
                break
            except Exception as e:
                # Connection errors should propagate up so caller can reconnect
                error_msg = str(e).lower()
                if "connection" in error_msg and ("closed" in error_msg or "lost" in error_msg):
                    raise DirectoryClientError(f"Connection lost: {e}") from e
                # Other errors (JSON parse, etc) - log and continue
                logger.warning(f"Error processing message: {e}")
                continue

        logger.trace(f"Collected {len(messages)} messages in {duration}s")
        return messages

    async def fetch_orderbooks(self) -> tuple[list[Offer], list[FidelityBond]]:
        """
        Fetch orderbooks from all connected peers.

        Returns:
            Tuple of (offers, fidelity_bonds)
        """
        # Use get_peerlist_with_features to populate peer_features cache
        peers_with_features = await self.get_peerlist_with_features()
        offers: list[Offer] = []
        bonds: list[FidelityBond] = []
        bond_utxo_set: set[str] = set()

        # Build set of active nicks for filtering stale offers
        # Use peerlist_with_features if available, otherwise fall back to basic peerlist
        active_nicks: set[str] = set()
        if peers_with_features:
            active_nicks = {nick for nick, _loc, _features in peers_with_features}
            logger.info(f"Found {len(peers_with_features)} peers on {self.host}:{self.port}")
        else:
            # Fallback for directories without peerlist_features support (reference impl)
            # or when all peers are NOT-SERVING-ONION (regtest/local)
            try:
                basic_peerlist = await self.get_peerlist()
                if basic_peerlist:
                    active_nicks = set(basic_peerlist)
                    logger.info(
                        f"Found {len(basic_peerlist)} peers on {self.host}:{self.port} (basic peerlist)"
                    )
                else:
                    logger.info(
                        f"Peerlist empty on {self.host}:{self.port} (makers may be NOT-SERVING-ONION)"
                    )
            except DirectoryClientError as e:
                logger.warning(f"Failed to get basic peerlist: {e}")

        if not self.connection:
            raise DirectoryClientError("Not connected")

        pubmsg = {
            "type": MessageType.PUBMSG.value,
            "line": f"{self.nick}!PUBLIC!!orderbook",
        }
        await self.connection.send(json.dumps(pubmsg).encode("utf-8"))
        logger.debug("Sent !orderbook broadcast to PUBLIC")

        logger.info("Listening for offer announcements for 10 seconds...")
        messages = await self.listen_for_messages(duration=10.0)

        logger.info(f"Received {len(messages)} messages, parsing offers...")

        for response in messages:
            try:
                msg_type = response.get("type")
                if msg_type not in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value):
                    logger.debug(f"Skipping message type {msg_type}")
                    continue

                line = response["line"]
                logger.debug(f"Processing message type {msg_type}: {line[:100]}...")

                parts = line.split(COMMAND_PREFIX)
                if len(parts) < 3:
                    logger.debug(f"Message has insufficient parts: {len(parts)}")
                    continue

                from_nick = parts[0]
                to_nick = parts[1]
                rest = COMMAND_PREFIX.join(parts[2:])

                if not rest.strip():
                    logger.debug("Empty message content")
                    continue

                offer_types = ["sw0absoffer", "sw0reloffer", "swabsoffer", "swreloffer"]
                parsed = False
                for offer_type in offer_types:
                    if rest.startswith(offer_type):
                        try:
                            # Split on '!' to extract flags (neutrino, tbond)
                            # Format: sw0reloffer 0 750000 790107726787 500 0.001!neutrino!tbond <proof>
                            # NOTE: !neutrino in offers is deprecated - primary detection is via
                            # handshake features. This parsing is kept for backwards compatibility.
                            rest_parts = rest.split(COMMAND_PREFIX)
                            offer_line = rest_parts[0]
                            bond_data = None
                            neutrino_compat = False

                            # Parse flags after the offer line (backwards compat for !neutrino)
                            for flag_part in rest_parts[1:]:
                                if flag_part.startswith("neutrino"):
                                    neutrino_compat = True
                                    logger.debug(f"Maker {from_nick} requires neutrino_compat")
                                elif flag_part.startswith("tbond "):
                                    bond_parts = flag_part[6:].split()
                                    if bond_parts:
                                        bond_proof_b64 = bond_parts[0]
                                        # For PRIVMSG, the maker signs with taker's actual nick
                                        # For PUBMSG, both nicks are the maker's (self-signed)
                                        is_privmsg = msg_type == MessageType.PRIVMSG.value
                                        taker_nick_for_proof = to_nick if is_privmsg else from_nick
                                        bond_data = parse_fidelity_bond_proof(
                                            bond_proof_b64, from_nick, taker_nick_for_proof
                                        )
                                        if bond_data:
                                            logger.debug(
                                                f"Parsed fidelity bond from {from_nick}: "
                                                f"txid={bond_data['utxo_txid'][:16]}..., "
                                                f"locktime={bond_data['locktime']}"
                                            )

                                            utxo_str = (
                                                f"{bond_data['utxo_txid']}:{bond_data['utxo_vout']}"
                                            )
                                            if utxo_str not in bond_utxo_set:
                                                bond_utxo_set.add(utxo_str)
                                                bond = FidelityBond(
                                                    counterparty=from_nick,
                                                    utxo_txid=bond_data["utxo_txid"],
                                                    utxo_vout=bond_data["utxo_vout"],
                                                    locktime=bond_data["locktime"],
                                                    script=bond_data["utxo_pub"],
                                                    utxo_confirmations=0,
                                                    cert_expiry=bond_data["cert_expiry"],
                                                    fidelity_bond_data=bond_data,
                                                )
                                                bonds.append(bond)

                            offer_parts = offer_line.split()
                            if len(offer_parts) < 6:
                                logger.warning(
                                    f"Offer from {from_nick} has {len(offer_parts)} parts, need 6"
                                )
                                continue

                            oid = int(offer_parts[1])
                            minsize = int(offer_parts[2])
                            maxsize = int(offer_parts[3])
                            txfee = int(offer_parts[4])
                            cjfee_str = offer_parts[5]

                            if offer_type in ["sw0absoffer", "swabsoffer"]:
                                cjfee = str(int(cjfee_str))
                            else:
                                cjfee = str(Decimal(cjfee_str))

                            offer = Offer(
                                counterparty=from_nick,
                                oid=oid,
                                ordertype=OfferType(offer_type),
                                minsize=minsize,
                                maxsize=maxsize,
                                txfee=txfee,
                                cjfee=cjfee,
                                fidelity_bond_value=0,
                                neutrino_compat=neutrino_compat,
                                features=self.peer_features.get(from_nick, {}),
                            )
                            offers.append(offer)

                            if bond_data:
                                offer.fidelity_bond_data = bond_data

                            logger.info(
                                f"Parsed {offer_type} from {from_nick}: "
                                f"oid={oid}, size={minsize}-{maxsize}, fee={cjfee}, "
                                f"has_bond={bond_data is not None}, neutrino_compat={neutrino_compat}"
                            )
                            parsed = True
                        except Exception as e:
                            logger.warning(f"Failed to parse {offer_type} from {from_nick}: {e}")
                        break

                if not parsed:
                    logger.debug(f"Message not an offer: {rest[:50]}...")

            except Exception as e:
                logger.warning(f"Failed to process message: {e}")
                continue

        # Filter offers to only include makers that are still in the current peerlist.
        # This prevents selecting stale offers from makers that have disconnected.
        # This is especially important for flaky tests where makers may restart or
        # disconnect between orderbook fetch and CoinJoin execution.
        #
        # Note: If peerlist is empty, we skip filtering and trust the offers. This happens when:
        # 1. All peers use NOT-SERVING-ONION (regtest/local environments)
        # 2. Directory doesn't support GETPEERLIST (reference implementation)
        #
        # The directory server will still reject messages to disconnected peers,
        # so we're not at risk of sending messages to offline makers.
        if active_nicks:
            original_count = len(offers)
            offers = [o for o in offers if o.counterparty in active_nicks]
            filtered_count = original_count - len(offers)
            if filtered_count > 0:
                logger.warning(
                    f"Filtered out {filtered_count} stale offers from disconnected makers"
                )
        elif self._peerlist_supported is False:
            logger.debug(
                "Skipping offer filtering - directory doesn't support GETPEERLIST "
                "(reference implementation)"
            )

        logger.info(
            f"Fetched {len(offers)} offers and {len(bonds)} fidelity bonds from "
            f"{self.host}:{self.port}"
        )
        return offers, bonds

    async def send_public_message(self, message: str) -> None:
        """
        Send a public message to all peers.

        Args:
            message: Message to broadcast
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        pubmsg = {
            "type": MessageType.PUBMSG.value,
            "line": f"{self.nick}!PUBLIC!{message}",
        }
        await self.connection.send(json.dumps(pubmsg).encode("utf-8"))

    async def send_private_message(self, recipient: str, command: str, data: str) -> None:
        """
        Send a signed private message to a specific peer.

        JoinMarket requires all private messages to be signed with the sender's
        nick private key. The signature is appended to the message:
        Format: "!<command> <data> <pubkey_hex> <signature>"

        The message-to-sign is: data + hostid (to prevent replay attacks)
        Note: Only the data is signed, NOT the command prefix.

        Args:
            recipient: Target peer nick
            command: Command name (without ! prefix, e.g., 'fill', 'auth', 'tx')
            data: Command arguments to send (will be signed)
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        # Sign just the data (not the command) with our nick identity
        # Reference: rawmessage = ' '.join(message[1:].split(' ')[1:-2])
        # This means they extract [1:-2] which is the args, not the command
        # So we sign: data + hostid
        signed_data = self.nick_identity.sign_message(data, self.hostid)

        # JoinMarket message format: from_nick!to_nick!command <args>
        # The COMMAND_PREFIX ("!") is used ONLY as a field separator between
        # from_nick, to_nick, and the message content. The command itself
        # does NOT have a "!" prefix.
        # Format: "<command> <signed_data>" where signed_data = "<data> <pubkey_hex> <sig_b64>"
        full_message = f"{command} {signed_data}"

        privmsg = {
            "type": MessageType.PRIVMSG.value,
            "line": f"{self.nick}!{recipient}!{full_message}",
        }
        await self.connection.send(json.dumps(privmsg).encode("utf-8"))

    async def close(self) -> None:
        """Close the connection to the directory server."""
        if self.connection:
            try:
                # NOTE: We skip sending DISCONNECT (801) because the reference implementation
                # crashes on unhandled control messages.
                pass
            except Exception:
                pass
            finally:
                await self.connection.close()
                self.connection = None

    def stop(self) -> None:
        """Stop continuous listening."""
        self.running = False

    async def listen_continuously(self, request_orderbook: bool = True) -> None:
        """
        Continuously listen for messages and update internal offer/bond caches.

        This method runs indefinitely until stop() is called or connection is lost.
        Used by orderbook_watcher and maker to maintain live orderbook state.

        Args:
            request_orderbook: If True, send !orderbook request on startup to get
                current offers from makers. Set to False for maker bots that don't
                need to receive other offers.
        """
        if not self.connection:
            raise DirectoryClientError("Not connected")

        logger.info(f"Starting continuous listening on {self.host}:{self.port}")
        self.running = True

        # Fetch peerlist with features to populate peer_features cache
        # This allows us to know which features each maker supports
        # Note: This may return empty if directory doesn't support GETPEERLIST (reference impl)
        try:
            await self.get_peerlist_with_features()
            if self._peerlist_supported:
                logger.info(f"Populated peer_features cache with {len(self.peer_features)} peers")
            else:
                logger.info(
                    "Directory doesn't support GETPEERLIST - peer features will be "
                    "learned from offer messages"
                )
        except Exception as e:
            logger.warning(f"Failed to fetch peerlist with features: {e}")

        # Request current orderbook from makers
        if request_orderbook:
            try:
                pubmsg = {
                    "type": MessageType.PUBMSG.value,
                    "line": f"{self.nick}!PUBLIC!!orderbook",
                }
                await self.connection.send(json.dumps(pubmsg).encode("utf-8"))
                logger.info("Sent !orderbook request to get current offers")
            except Exception as e:
                logger.warning(f"Failed to send !orderbook request: {e}")

        # Track when we last sent an orderbook request (to avoid spamming)
        import time

        last_orderbook_request = time.time()
        orderbook_request_min_interval = 60.0  # Minimum 60 seconds between requests

        while self.running:
            try:
                # Read next message with timeout
                data = await asyncio.wait_for(self.connection.receive(), timeout=5.0)

                if not data:
                    logger.warning(f"Connection to {self.host}:{self.port} closed")
                    break

                message = json.loads(data.decode("utf-8"))
                msg_type = message.get("type")
                line = message.get("line", "")

                # Process PUBMSG and PRIVMSG to update offers/bonds cache
                # Reference implementation sends offer responses to !orderbook via PRIVMSG
                if msg_type in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value):
                    try:
                        parts = line.split(COMMAND_PREFIX)
                        if len(parts) >= 3:
                            from_nick = parts[0]
                            to_nick = parts[1]
                            rest = COMMAND_PREFIX.join(parts[2:])

                            # Accept PUBLIC broadcasts or messages addressed to us
                            if to_nick == "PUBLIC" or to_nick == self.nick:
                                # If we don't have features for this peer, it's a new peer.
                                # We can try to refresh peerlist, but respect rate limits
                                # and don't spam if directory doesn't support it.
                                is_new_peer = from_nick not in self.peer_features
                                current_time = time.time()

                                if is_new_peer and self._peerlist_supported is not False:
                                    # Only refresh peerlist if we haven't recently
                                    # (get_peerlist_with_features has its own rate limiting)
                                    try:
                                        await self.get_peerlist_with_features()
                                        if self._peerlist_supported:
                                            logger.debug(
                                                f"Refreshed peerlist (new peer: {from_nick}), "
                                                f"now tracking {len(self.peer_features)} peers"
                                            )
                                    except Exception as e:
                                        logger.debug(f"Failed to refresh peerlist: {e}")

                                    # Request orderbook from new peer (rate-limited)
                                    if (
                                        request_orderbook
                                        and current_time - last_orderbook_request
                                        > orderbook_request_min_interval
                                    ):
                                        try:
                                            pubmsg = {
                                                "type": MessageType.PUBMSG.value,
                                                "line": f"{self.nick}!PUBLIC!!orderbook",
                                            }
                                            await self.connection.send(
                                                json.dumps(pubmsg).encode("utf-8")
                                            )
                                            last_orderbook_request = current_time
                                            logger.info(
                                                f"Sent !orderbook request for new peer {from_nick}"
                                            )
                                        except Exception as e:
                                            logger.debug(f"Failed to send !orderbook: {e}")

                                # Parse offer announcements
                                for offer_type_prefix in [
                                    "sw0reloffer",
                                    "sw0absoffer",
                                    "swreloffer",
                                    "swabsoffer",
                                ]:
                                    if rest.startswith(offer_type_prefix):
                                        # Separate offer from fidelity bond data
                                        rest_parts = rest.split(COMMAND_PREFIX, 1)
                                        offer_line = rest_parts[0].strip()

                                        # Parse fidelity bond if present
                                        bond_data = None
                                        if len(rest_parts) > 1 and rest_parts[1].startswith(
                                            "tbond "
                                        ):
                                            bond_parts = rest_parts[1][6:].split()
                                            if bond_parts:
                                                bond_proof_b64 = bond_parts[0]
                                                # For PUBLIC announcements, maker uses their own nick
                                                # as taker_nick when creating the proof.
                                                # For PRIVMSG (response to !orderbook), maker signs
                                                # for the recipient (us).
                                                taker_nick_for_proof = (
                                                    from_nick if to_nick == "PUBLIC" else to_nick
                                                )
                                                bond_data = parse_fidelity_bond_proof(
                                                    bond_proof_b64, from_nick, taker_nick_for_proof
                                                )
                                                if bond_data:
                                                    logger.debug(
                                                        f"Parsed fidelity bond from {from_nick}: "
                                                        f"txid={bond_data['utxo_txid'][:16]}..., "
                                                        f"locktime={bond_data['locktime']}"
                                                    )
                                                    # Store bond in bonds cache
                                                    utxo_str = (
                                                        f"{bond_data['utxo_txid']}:"
                                                        f"{bond_data['utxo_vout']}"
                                                    )
                                                    bond = FidelityBond(
                                                        counterparty=from_nick,
                                                        utxo_txid=bond_data["utxo_txid"],
                                                        utxo_vout=bond_data["utxo_vout"],
                                                        locktime=bond_data["locktime"],
                                                        script=bond_data["utxo_pub"],
                                                        utxo_confirmations=0,
                                                        cert_expiry=bond_data["cert_expiry"],
                                                        fidelity_bond_data=bond_data,
                                                    )
                                                    self.bonds[utxo_str] = bond

                                        offer_parts = offer_line.split()
                                        if len(offer_parts) >= 6:
                                            try:
                                                oid = int(offer_parts[1])
                                                minsize = int(offer_parts[2])
                                                maxsize = int(offer_parts[3])
                                                txfee = int(offer_parts[4])
                                                cjfee_str = offer_parts[5]

                                                if offer_type_prefix in [
                                                    "sw0absoffer",
                                                    "swabsoffer",
                                                ]:
                                                    cjfee = str(int(cjfee_str))
                                                else:
                                                    cjfee = str(Decimal(cjfee_str))

                                                offer = Offer(
                                                    counterparty=from_nick,
                                                    oid=oid,
                                                    ordertype=OfferType(offer_type_prefix),
                                                    minsize=minsize,
                                                    maxsize=maxsize,
                                                    txfee=txfee,
                                                    cjfee=cjfee,
                                                    fidelity_bond_value=0,
                                                    fidelity_bond_data=bond_data,
                                                    features=self.peer_features.get(from_nick, {}),
                                                )

                                                # Update cache using tuple key
                                                offer_key = (from_nick, oid)
                                                self.offers[offer_key] = offer

                                                # Track this peer as "known" even if peerlist didn't
                                                # return features. This prevents re-triggering new peer
                                                # logic for every message from this peer.
                                                if from_nick not in self.peer_features:
                                                    self.peer_features[from_nick] = {}

                                                logger.debug(
                                                    f"Updated offer cache: {from_nick} "
                                                    f"{offer_type_prefix} oid={oid}"
                                                    + (" (with bond)" if bond_data else "")
                                                )
                                            except Exception as e:
                                                logger.debug(f"Failed to parse offer update: {e}")
                                        break
                    except Exception as e:
                        logger.debug(f"Failed to process PUBMSG: {e}")

            except TimeoutError:
                continue
            except asyncio.CancelledError:
                logger.info(f"Continuous listening on {self.host}:{self.port} cancelled")
                break
            except Exception as e:
                logger.error(f"Error in continuous listening: {e}")
                if self.on_disconnect:
                    self.on_disconnect()
                break

        self.running = False
        logger.info(f"Stopped continuous listening on {self.host}:{self.port}")

    def get_current_offers(self) -> list[Offer]:
        """Get the current list of cached offers."""
        return list(self.offers.values())

    def get_current_bonds(self) -> list[FidelityBond]:
        """Get the current list of cached fidelity bonds."""
        return list(self.bonds.values())

    def supports_extended_utxo_format(self) -> bool:
        """
        Check if we should use extended UTXO format with this directory.

        Extended format (txid:vout:scriptpubkey:blockheight) is used when
        both sides advertise neutrino_compat feature. Protocol version
        is not checked - features are negotiated independently.

        Returns:
            True if extended UTXO format should be used
        """
        return self.neutrino_compat and self.directory_neutrino_compat

    def get_negotiated_version(self) -> int:
        """
        Get the negotiated protocol version.

        Returns:
            Negotiated version (always 5 with feature-based approach)
        """
        return self.negotiated_version if self.negotiated_version is not None else JM_VERSION

Client for connecting to JoinMarket directory servers.

Supports: - Direct TCP connections (for local/dev) - Tor connections (for .onion addresses) - Handshake protocol - Peerlist fetching - Orderbook fetching - Continuous listening for updates

Initialize DirectoryClient.

Args

host
Directory server hostname or .onion address
port
Directory server port
network
Bitcoin network (mainnet, testnet, signet, regtest)
nick_identity
NickIdentity for message signing (generated if None)
location
Our location string (onion address or NOT-SERVING-ONION)
socks_host
SOCKS proxy host for Tor
socks_port
SOCKS proxy port for Tor
timeout
Connection timeout in seconds
max_message_size
Maximum message size in bytes
on_disconnect
Callback when connection drops
neutrino_compat
Advertise support for Neutrino-compatible UTXO metadata

Methods

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close the connection to the directory server."""
    if self.connection:
        try:
            # NOTE: We skip sending DISCONNECT (801) because the reference implementation
            # crashes on unhandled control messages.
            pass
        except Exception:
            pass
        finally:
            await self.connection.close()
            self.connection = None

Close the connection to the directory server.

async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    """Connect to the directory server and perform handshake."""
    try:
        logger.debug(f"DirectoryClient.connect: connecting to {self.host}:{self.port}")
        if not self.host.endswith(".onion"):
            self.connection = await connect_direct(
                self.host,
                self.port,
                self.max_message_size,
                self.timeout,
            )
            logger.debug("DirectoryClient.connect: direct connection established")
        else:
            self.connection = await connect_via_tor(
                self.host,
                self.port,
                self.socks_host,
                self.socks_port,
                self.max_message_size,
                self.timeout,
            )
            logger.debug("DirectoryClient.connect: tor connection established")
        logger.debug("DirectoryClient.connect: starting handshake")
        await self._handshake()
        logger.debug("DirectoryClient.connect: handshake complete")
    except Exception as e:
        logger.error(f"Failed to connect to {self.host}:{self.port}: {e}", exc_info=True)
        # Clean up connection if handshake failed
        if self.connection:
            with contextlib.suppress(Exception):
                await self.connection.close()
            self.connection = None
        raise DirectoryClientError(f"Connection failed: {e}") from e

Connect to the directory server and perform handshake.

async def fetch_orderbooks(self) ‑> tuple[list[Offer], list[FidelityBond]]
Expand source code
async def fetch_orderbooks(self) -> tuple[list[Offer], list[FidelityBond]]:
    """
    Fetch orderbooks from all connected peers.

    Returns:
        Tuple of (offers, fidelity_bonds)
    """
    # Use get_peerlist_with_features to populate peer_features cache
    peers_with_features = await self.get_peerlist_with_features()
    offers: list[Offer] = []
    bonds: list[FidelityBond] = []
    bond_utxo_set: set[str] = set()

    # Build set of active nicks for filtering stale offers
    # Use peerlist_with_features if available, otherwise fall back to basic peerlist
    active_nicks: set[str] = set()
    if peers_with_features:
        active_nicks = {nick for nick, _loc, _features in peers_with_features}
        logger.info(f"Found {len(peers_with_features)} peers on {self.host}:{self.port}")
    else:
        # Fallback for directories without peerlist_features support (reference impl)
        # or when all peers are NOT-SERVING-ONION (regtest/local)
        try:
            basic_peerlist = await self.get_peerlist()
            if basic_peerlist:
                active_nicks = set(basic_peerlist)
                logger.info(
                    f"Found {len(basic_peerlist)} peers on {self.host}:{self.port} (basic peerlist)"
                )
            else:
                logger.info(
                    f"Peerlist empty on {self.host}:{self.port} (makers may be NOT-SERVING-ONION)"
                )
        except DirectoryClientError as e:
            logger.warning(f"Failed to get basic peerlist: {e}")

    if not self.connection:
        raise DirectoryClientError("Not connected")

    pubmsg = {
        "type": MessageType.PUBMSG.value,
        "line": f"{self.nick}!PUBLIC!!orderbook",
    }
    await self.connection.send(json.dumps(pubmsg).encode("utf-8"))
    logger.debug("Sent !orderbook broadcast to PUBLIC")

    logger.info("Listening for offer announcements for 10 seconds...")
    messages = await self.listen_for_messages(duration=10.0)

    logger.info(f"Received {len(messages)} messages, parsing offers...")

    for response in messages:
        try:
            msg_type = response.get("type")
            if msg_type not in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value):
                logger.debug(f"Skipping message type {msg_type}")
                continue

            line = response["line"]
            logger.debug(f"Processing message type {msg_type}: {line[:100]}...")

            parts = line.split(COMMAND_PREFIX)
            if len(parts) < 3:
                logger.debug(f"Message has insufficient parts: {len(parts)}")
                continue

            from_nick = parts[0]
            to_nick = parts[1]
            rest = COMMAND_PREFIX.join(parts[2:])

            if not rest.strip():
                logger.debug("Empty message content")
                continue

            offer_types = ["sw0absoffer", "sw0reloffer", "swabsoffer", "swreloffer"]
            parsed = False
            for offer_type in offer_types:
                if rest.startswith(offer_type):
                    try:
                        # Split on '!' to extract flags (neutrino, tbond)
                        # Format: sw0reloffer 0 750000 790107726787 500 0.001!neutrino!tbond <proof>
                        # NOTE: !neutrino in offers is deprecated - primary detection is via
                        # handshake features. This parsing is kept for backwards compatibility.
                        rest_parts = rest.split(COMMAND_PREFIX)
                        offer_line = rest_parts[0]
                        bond_data = None
                        neutrino_compat = False

                        # Parse flags after the offer line (backwards compat for !neutrino)
                        for flag_part in rest_parts[1:]:
                            if flag_part.startswith("neutrino"):
                                neutrino_compat = True
                                logger.debug(f"Maker {from_nick} requires neutrino_compat")
                            elif flag_part.startswith("tbond "):
                                bond_parts = flag_part[6:].split()
                                if bond_parts:
                                    bond_proof_b64 = bond_parts[0]
                                    # For PRIVMSG, the maker signs with taker's actual nick
                                    # For PUBMSG, both nicks are the maker's (self-signed)
                                    is_privmsg = msg_type == MessageType.PRIVMSG.value
                                    taker_nick_for_proof = to_nick if is_privmsg else from_nick
                                    bond_data = parse_fidelity_bond_proof(
                                        bond_proof_b64, from_nick, taker_nick_for_proof
                                    )
                                    if bond_data:
                                        logger.debug(
                                            f"Parsed fidelity bond from {from_nick}: "
                                            f"txid={bond_data['utxo_txid'][:16]}..., "
                                            f"locktime={bond_data['locktime']}"
                                        )

                                        utxo_str = (
                                            f"{bond_data['utxo_txid']}:{bond_data['utxo_vout']}"
                                        )
                                        if utxo_str not in bond_utxo_set:
                                            bond_utxo_set.add(utxo_str)
                                            bond = FidelityBond(
                                                counterparty=from_nick,
                                                utxo_txid=bond_data["utxo_txid"],
                                                utxo_vout=bond_data["utxo_vout"],
                                                locktime=bond_data["locktime"],
                                                script=bond_data["utxo_pub"],
                                                utxo_confirmations=0,
                                                cert_expiry=bond_data["cert_expiry"],
                                                fidelity_bond_data=bond_data,
                                            )
                                            bonds.append(bond)

                        offer_parts = offer_line.split()
                        if len(offer_parts) < 6:
                            logger.warning(
                                f"Offer from {from_nick} has {len(offer_parts)} parts, need 6"
                            )
                            continue

                        oid = int(offer_parts[1])
                        minsize = int(offer_parts[2])
                        maxsize = int(offer_parts[3])
                        txfee = int(offer_parts[4])
                        cjfee_str = offer_parts[5]

                        if offer_type in ["sw0absoffer", "swabsoffer"]:
                            cjfee = str(int(cjfee_str))
                        else:
                            cjfee = str(Decimal(cjfee_str))

                        offer = Offer(
                            counterparty=from_nick,
                            oid=oid,
                            ordertype=OfferType(offer_type),
                            minsize=minsize,
                            maxsize=maxsize,
                            txfee=txfee,
                            cjfee=cjfee,
                            fidelity_bond_value=0,
                            neutrino_compat=neutrino_compat,
                            features=self.peer_features.get(from_nick, {}),
                        )
                        offers.append(offer)

                        if bond_data:
                            offer.fidelity_bond_data = bond_data

                        logger.info(
                            f"Parsed {offer_type} from {from_nick}: "
                            f"oid={oid}, size={minsize}-{maxsize}, fee={cjfee}, "
                            f"has_bond={bond_data is not None}, neutrino_compat={neutrino_compat}"
                        )
                        parsed = True
                    except Exception as e:
                        logger.warning(f"Failed to parse {offer_type} from {from_nick}: {e}")
                    break

            if not parsed:
                logger.debug(f"Message not an offer: {rest[:50]}...")

        except Exception as e:
            logger.warning(f"Failed to process message: {e}")
            continue

    # Filter offers to only include makers that are still in the current peerlist.
    # This prevents selecting stale offers from makers that have disconnected.
    # This is especially important for flaky tests where makers may restart or
    # disconnect between orderbook fetch and CoinJoin execution.
    #
    # Note: If peerlist is empty, we skip filtering and trust the offers. This happens when:
    # 1. All peers use NOT-SERVING-ONION (regtest/local environments)
    # 2. Directory doesn't support GETPEERLIST (reference implementation)
    #
    # The directory server will still reject messages to disconnected peers,
    # so we're not at risk of sending messages to offline makers.
    if active_nicks:
        original_count = len(offers)
        offers = [o for o in offers if o.counterparty in active_nicks]
        filtered_count = original_count - len(offers)
        if filtered_count > 0:
            logger.warning(
                f"Filtered out {filtered_count} stale offers from disconnected makers"
            )
    elif self._peerlist_supported is False:
        logger.debug(
            "Skipping offer filtering - directory doesn't support GETPEERLIST "
            "(reference implementation)"
        )

    logger.info(
        f"Fetched {len(offers)} offers and {len(bonds)} fidelity bonds from "
        f"{self.host}:{self.port}"
    )
    return offers, bonds

Fetch orderbooks from all connected peers.

Returns

Tuple of (offers, fidelity_bonds)

def get_current_bonds(self) ‑> list[FidelityBond]
Expand source code
def get_current_bonds(self) -> list[FidelityBond]:
    """Get the current list of cached fidelity bonds."""
    return list(self.bonds.values())

Get the current list of cached fidelity bonds.

def get_current_offers(self) ‑> list[Offer]
Expand source code
def get_current_offers(self) -> list[Offer]:
    """Get the current list of cached offers."""
    return list(self.offers.values())

Get the current list of cached offers.

def get_negotiated_version(self) ‑> int
Expand source code
def get_negotiated_version(self) -> int:
    """
    Get the negotiated protocol version.

    Returns:
        Negotiated version (always 5 with feature-based approach)
    """
    return self.negotiated_version if self.negotiated_version is not None else JM_VERSION

Get the negotiated protocol version.

Returns

Negotiated version (always 5 with feature-based approach)

async def get_peerlist(self) ‑> list[str]
Expand source code
async def get_peerlist(self) -> list[str]:
    """
    Fetch the current list of connected peers.

    Note: Reference implementation directories do NOT support GETPEERLIST.
    This method shares peerlist support tracking with get_peerlist_with_features().

    Returns:
        List of active peer nicks. Returns empty list if directory doesn't
        support GETPEERLIST.
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    # Skip if we already know this directory doesn't support GETPEERLIST
    if self._peerlist_supported is False:
        logger.debug("Skipping GETPEERLIST - directory doesn't support it")
        return []

    # Rate-limit peerlist requests to avoid spamming
    import time

    current_time = time.time()
    if current_time - self._last_peerlist_request_time < self._peerlist_min_interval:
        logger.debug(
            f"Skipping GETPEERLIST - rate limited "
            f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)"
        )
        return []

    self._last_peerlist_request_time = current_time

    getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""}
    logger.debug("Sending GETPEERLIST request")
    await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8"))

    start_time = asyncio.get_event_loop().time()
    response = None

    while True:
        elapsed = asyncio.get_event_loop().time() - start_time
        if elapsed > self.timeout:
            # Timeout without PEERLIST response - directory likely doesn't support it
            logger.info(
                f"Timed out waiting for PEERLIST from {self.host}:{self.port} - "
                "directory likely doesn't support GETPEERLIST (reference implementation)"
            )
            self._peerlist_supported = False
            return []

        try:
            response_data = await asyncio.wait_for(
                self.connection.receive(), timeout=self.timeout - elapsed
            )
            response = json.loads(response_data.decode("utf-8"))
            msg_type = response.get("type")
            logger.debug(f"Received response type: {msg_type}")

            if msg_type == MessageType.PEERLIST.value:
                break

            logger.debug(
                f"Skipping unexpected message type {msg_type} while waiting for PEERLIST"
            )
        except TimeoutError:
            # Timeout without PEERLIST response - directory likely doesn't support it
            logger.info(
                f"Timed out waiting for PEERLIST from {self.host}:{self.port} - "
                "directory likely doesn't support GETPEERLIST (reference implementation)"
            )
            self._peerlist_supported = False
            return []
        except Exception as e:
            logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}")
            if asyncio.get_event_loop().time() - start_time > self.timeout:
                self._peerlist_supported = False
                return []

    peerlist_str = response["line"]
    logger.debug(f"Peerlist string: {peerlist_str}")

    # Mark peerlist as supported since we got a valid response
    self._peerlist_supported = True

    if not peerlist_str:
        return []

    peers = []
    for entry in peerlist_str.split(","):
        # Skip empty entries
        if not entry or not entry.strip():
            continue
        # Skip entries without separator - these are metadata (e.g., 'peerlist_features')
        # from the reference implementation, not actual peer entries
        if NICK_PEERLOCATOR_SEPARATOR not in entry:
            logger.debug(f"Skipping metadata entry in peerlist: '{entry}'")
            continue
        try:
            nick, location, disconnected, _features = parse_peerlist_entry(entry)
            logger.debug(f"Parsed peer: {nick} at {location}, disconnected={disconnected}")
            if not disconnected:
                peers.append(nick)
        except ValueError as e:
            logger.warning(f"Failed to parse peerlist entry '{entry}': {e}")
            continue

    logger.info(f"Received {len(peers)} active peers from {self.host}:{self.port}")
    return peers

Fetch the current list of connected peers.

Note: Reference implementation directories do NOT support GETPEERLIST. This method shares peerlist support tracking with get_peerlist_with_features().

Returns

List of active peer nicks. Returns empty list if directory doesn't support GETPEERLIST.

async def get_peerlist_with_features(self) ‑> list[tuple[str, str, FeatureSet]]
Expand source code
async def get_peerlist_with_features(self) -> list[tuple[str, str, FeatureSet]]:
    """
    Fetch the current list of connected peers with their features.

    Uses the standard GETPEERLIST message. If the directory supports
    peerlist_features, the response will include F: suffix with features.

    Note: Reference implementation directories do NOT support GETPEERLIST.
    This method tracks whether the directory supports it and skips requests
    to unsupported directories to avoid spamming warnings in their logs.

    Returns:
        List of (nick, location, features) tuples for active peers.
        Features will be empty for directories that don't support peerlist_features.
        Returns empty list if directory doesn't support GETPEERLIST.
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    # Skip if we already know this directory doesn't support GETPEERLIST
    if self._peerlist_supported is False:
        logger.debug("Skipping GETPEERLIST - directory doesn't support it")
        return []

    # Rate-limit peerlist requests to avoid spamming
    import time

    current_time = time.time()
    if current_time - self._last_peerlist_request_time < self._peerlist_min_interval:
        logger.debug(
            f"Skipping GETPEERLIST - rate limited "
            f"(last request {current_time - self._last_peerlist_request_time:.1f}s ago)"
        )
        return []

    self._last_peerlist_request_time = current_time

    getpeerlist_msg = {"type": MessageType.GETPEERLIST.value, "line": ""}
    logger.debug("Sending GETPEERLIST request")
    await self.connection.send(json.dumps(getpeerlist_msg).encode("utf-8"))

    start_time = asyncio.get_event_loop().time()
    response = None

    while True:
        elapsed = asyncio.get_event_loop().time() - start_time
        if elapsed > self.timeout:
            # Timeout without PEERLIST response - directory likely doesn't support it
            logger.info(
                f"Timed out waiting for PEERLIST from {self.host}:{self.port} - "
                "directory likely doesn't support GETPEERLIST (reference implementation)"
            )
            self._peerlist_supported = False
            return []

        try:
            response_data = await asyncio.wait_for(
                self.connection.receive(), timeout=self.timeout - elapsed
            )
            response = json.loads(response_data.decode("utf-8"))
            msg_type = response.get("type")
            logger.debug(f"Received response type: {msg_type}")

            if msg_type == MessageType.PEERLIST.value:
                break

            logger.debug(
                f"Skipping unexpected message type {msg_type} while waiting for PEERLIST"
            )
        except TimeoutError:
            # Timeout without PEERLIST response - directory likely doesn't support it
            logger.info(
                f"Timed out waiting for PEERLIST from {self.host}:{self.port} - "
                "directory likely doesn't support GETPEERLIST (reference implementation)"
            )
            self._peerlist_supported = False
            return []
        except Exception as e:
            logger.warning(f"Error receiving/parsing message while waiting for PEERLIST: {e}")
            if asyncio.get_event_loop().time() - start_time > self.timeout:
                self._peerlist_supported = False
                return []

    peerlist_str = response["line"]
    logger.debug(f"Peerlist string: {peerlist_str}")

    # Mark peerlist as supported since we got a valid response
    self._peerlist_supported = True

    if not peerlist_str:
        return []

    peers: list[tuple[str, str, FeatureSet]] = []
    for entry in peerlist_str.split(","):
        # Skip empty entries
        if not entry or not entry.strip():
            continue
        # Skip entries without separator - these are metadata (e.g., 'peerlist_features')
        # from the reference implementation, not actual peer entries
        if NICK_PEERLOCATOR_SEPARATOR not in entry:
            logger.debug(f"Skipping metadata entry in peerlist: '{entry}'")
            continue
        try:
            nick, location, disconnected, features = parse_peerlist_entry(entry)
            logger.debug(
                f"Parsed peer: {nick} at {location}, "
                f"disconnected={disconnected}, features={features.to_comma_string()}"
            )
            if not disconnected:
                peers.append((nick, location, features))
                # Always update peer_features cache to track that we've seen this peer
                # This prevents triggering "new peer" logic for every message from this peer
                self.peer_features[nick] = features.to_dict()
        except ValueError as e:
            logger.warning(f"Failed to parse peerlist entry '{entry}': {e}")
            continue

    logger.info(
        f"Received {len(peers)} active peers with features from {self.host}:{self.port}"
    )
    return peers

Fetch the current list of connected peers with their features.

Uses the standard GETPEERLIST message. If the directory supports peerlist_features, the response will include F: suffix with features.

Note: Reference implementation directories do NOT support GETPEERLIST. This method tracks whether the directory supports it and skips requests to unsupported directories to avoid spamming warnings in their logs.

Returns

List of (nick, location, features) tuples for active peers. Features will be empty for directories that don't support peerlist_features. Returns empty list if directory doesn't support GETPEERLIST.

async def listen_continuously(self, request_orderbook: bool = True) ‑> None
Expand source code
async def listen_continuously(self, request_orderbook: bool = True) -> None:
    """
    Continuously listen for messages and update internal offer/bond caches.

    This method runs indefinitely until stop() is called or connection is lost.
    Used by orderbook_watcher and maker to maintain live orderbook state.

    Args:
        request_orderbook: If True, send !orderbook request on startup to get
            current offers from makers. Set to False for maker bots that don't
            need to receive other offers.
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    logger.info(f"Starting continuous listening on {self.host}:{self.port}")
    self.running = True

    # Fetch peerlist with features to populate peer_features cache
    # This allows us to know which features each maker supports
    # Note: This may return empty if directory doesn't support GETPEERLIST (reference impl)
    try:
        await self.get_peerlist_with_features()
        if self._peerlist_supported:
            logger.info(f"Populated peer_features cache with {len(self.peer_features)} peers")
        else:
            logger.info(
                "Directory doesn't support GETPEERLIST - peer features will be "
                "learned from offer messages"
            )
    except Exception as e:
        logger.warning(f"Failed to fetch peerlist with features: {e}")

    # Request current orderbook from makers
    if request_orderbook:
        try:
            pubmsg = {
                "type": MessageType.PUBMSG.value,
                "line": f"{self.nick}!PUBLIC!!orderbook",
            }
            await self.connection.send(json.dumps(pubmsg).encode("utf-8"))
            logger.info("Sent !orderbook request to get current offers")
        except Exception as e:
            logger.warning(f"Failed to send !orderbook request: {e}")

    # Track when we last sent an orderbook request (to avoid spamming)
    import time

    last_orderbook_request = time.time()
    orderbook_request_min_interval = 60.0  # Minimum 60 seconds between requests

    while self.running:
        try:
            # Read next message with timeout
            data = await asyncio.wait_for(self.connection.receive(), timeout=5.0)

            if not data:
                logger.warning(f"Connection to {self.host}:{self.port} closed")
                break

            message = json.loads(data.decode("utf-8"))
            msg_type = message.get("type")
            line = message.get("line", "")

            # Process PUBMSG and PRIVMSG to update offers/bonds cache
            # Reference implementation sends offer responses to !orderbook via PRIVMSG
            if msg_type in (MessageType.PUBMSG.value, MessageType.PRIVMSG.value):
                try:
                    parts = line.split(COMMAND_PREFIX)
                    if len(parts) >= 3:
                        from_nick = parts[0]
                        to_nick = parts[1]
                        rest = COMMAND_PREFIX.join(parts[2:])

                        # Accept PUBLIC broadcasts or messages addressed to us
                        if to_nick == "PUBLIC" or to_nick == self.nick:
                            # If we don't have features for this peer, it's a new peer.
                            # We can try to refresh peerlist, but respect rate limits
                            # and don't spam if directory doesn't support it.
                            is_new_peer = from_nick not in self.peer_features
                            current_time = time.time()

                            if is_new_peer and self._peerlist_supported is not False:
                                # Only refresh peerlist if we haven't recently
                                # (get_peerlist_with_features has its own rate limiting)
                                try:
                                    await self.get_peerlist_with_features()
                                    if self._peerlist_supported:
                                        logger.debug(
                                            f"Refreshed peerlist (new peer: {from_nick}), "
                                            f"now tracking {len(self.peer_features)} peers"
                                        )
                                except Exception as e:
                                    logger.debug(f"Failed to refresh peerlist: {e}")

                                # Request orderbook from new peer (rate-limited)
                                if (
                                    request_orderbook
                                    and current_time - last_orderbook_request
                                    > orderbook_request_min_interval
                                ):
                                    try:
                                        pubmsg = {
                                            "type": MessageType.PUBMSG.value,
                                            "line": f"{self.nick}!PUBLIC!!orderbook",
                                        }
                                        await self.connection.send(
                                            json.dumps(pubmsg).encode("utf-8")
                                        )
                                        last_orderbook_request = current_time
                                        logger.info(
                                            f"Sent !orderbook request for new peer {from_nick}"
                                        )
                                    except Exception as e:
                                        logger.debug(f"Failed to send !orderbook: {e}")

                            # Parse offer announcements
                            for offer_type_prefix in [
                                "sw0reloffer",
                                "sw0absoffer",
                                "swreloffer",
                                "swabsoffer",
                            ]:
                                if rest.startswith(offer_type_prefix):
                                    # Separate offer from fidelity bond data
                                    rest_parts = rest.split(COMMAND_PREFIX, 1)
                                    offer_line = rest_parts[0].strip()

                                    # Parse fidelity bond if present
                                    bond_data = None
                                    if len(rest_parts) > 1 and rest_parts[1].startswith(
                                        "tbond "
                                    ):
                                        bond_parts = rest_parts[1][6:].split()
                                        if bond_parts:
                                            bond_proof_b64 = bond_parts[0]
                                            # For PUBLIC announcements, maker uses their own nick
                                            # as taker_nick when creating the proof.
                                            # For PRIVMSG (response to !orderbook), maker signs
                                            # for the recipient (us).
                                            taker_nick_for_proof = (
                                                from_nick if to_nick == "PUBLIC" else to_nick
                                            )
                                            bond_data = parse_fidelity_bond_proof(
                                                bond_proof_b64, from_nick, taker_nick_for_proof
                                            )
                                            if bond_data:
                                                logger.debug(
                                                    f"Parsed fidelity bond from {from_nick}: "
                                                    f"txid={bond_data['utxo_txid'][:16]}..., "
                                                    f"locktime={bond_data['locktime']}"
                                                )
                                                # Store bond in bonds cache
                                                utxo_str = (
                                                    f"{bond_data['utxo_txid']}:"
                                                    f"{bond_data['utxo_vout']}"
                                                )
                                                bond = FidelityBond(
                                                    counterparty=from_nick,
                                                    utxo_txid=bond_data["utxo_txid"],
                                                    utxo_vout=bond_data["utxo_vout"],
                                                    locktime=bond_data["locktime"],
                                                    script=bond_data["utxo_pub"],
                                                    utxo_confirmations=0,
                                                    cert_expiry=bond_data["cert_expiry"],
                                                    fidelity_bond_data=bond_data,
                                                )
                                                self.bonds[utxo_str] = bond

                                    offer_parts = offer_line.split()
                                    if len(offer_parts) >= 6:
                                        try:
                                            oid = int(offer_parts[1])
                                            minsize = int(offer_parts[2])
                                            maxsize = int(offer_parts[3])
                                            txfee = int(offer_parts[4])
                                            cjfee_str = offer_parts[5]

                                            if offer_type_prefix in [
                                                "sw0absoffer",
                                                "swabsoffer",
                                            ]:
                                                cjfee = str(int(cjfee_str))
                                            else:
                                                cjfee = str(Decimal(cjfee_str))

                                            offer = Offer(
                                                counterparty=from_nick,
                                                oid=oid,
                                                ordertype=OfferType(offer_type_prefix),
                                                minsize=minsize,
                                                maxsize=maxsize,
                                                txfee=txfee,
                                                cjfee=cjfee,
                                                fidelity_bond_value=0,
                                                fidelity_bond_data=bond_data,
                                                features=self.peer_features.get(from_nick, {}),
                                            )

                                            # Update cache using tuple key
                                            offer_key = (from_nick, oid)
                                            self.offers[offer_key] = offer

                                            # Track this peer as "known" even if peerlist didn't
                                            # return features. This prevents re-triggering new peer
                                            # logic for every message from this peer.
                                            if from_nick not in self.peer_features:
                                                self.peer_features[from_nick] = {}

                                            logger.debug(
                                                f"Updated offer cache: {from_nick} "
                                                f"{offer_type_prefix} oid={oid}"
                                                + (" (with bond)" if bond_data else "")
                                            )
                                        except Exception as e:
                                            logger.debug(f"Failed to parse offer update: {e}")
                                    break
                except Exception as e:
                    logger.debug(f"Failed to process PUBMSG: {e}")

        except TimeoutError:
            continue
        except asyncio.CancelledError:
            logger.info(f"Continuous listening on {self.host}:{self.port} cancelled")
            break
        except Exception as e:
            logger.error(f"Error in continuous listening: {e}")
            if self.on_disconnect:
                self.on_disconnect()
            break

    self.running = False
    logger.info(f"Stopped continuous listening on {self.host}:{self.port}")

Continuously listen for messages and update internal offer/bond caches.

This method runs indefinitely until stop() is called or connection is lost. Used by orderbook_watcher and maker to maintain live orderbook state.

Args

request_orderbook
If True, send !orderbook request on startup to get current offers from makers. Set to False for maker bots that don't need to receive other offers.
async def listen_for_messages(self, duration: float = 5.0) ‑> list[dict[str, typing.Any]]
Expand source code
async def listen_for_messages(self, duration: float = 5.0) -> list[dict[str, Any]]:
    """
    Listen for messages for a specified duration.

    This method collects all messages received within the specified duration.
    It properly handles connection closed errors by raising DirectoryClientError.

    Args:
        duration: How long to listen in seconds

    Returns:
        List of received messages

    Raises:
        DirectoryClientError: If not connected or connection is lost
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    # Check connection state before starting
    if not self.connection.is_connected():
        raise DirectoryClientError("Connection closed")

    messages: list[dict[str, Any]] = []
    start_time = asyncio.get_event_loop().time()

    while asyncio.get_event_loop().time() - start_time < duration:
        try:
            remaining_time = duration - (asyncio.get_event_loop().time() - start_time)
            if remaining_time <= 0:
                break

            response_data = await asyncio.wait_for(
                self.connection.receive(), timeout=remaining_time
            )
            response = json.loads(response_data.decode("utf-8"))
            logger.trace(
                f"Received message type {response.get('type')}: "
                f"{response.get('line', '')[:80]}..."
            )
            messages.append(response)

        except TimeoutError:
            # Normal timeout - no more messages within duration
            break
        except Exception as e:
            # Connection errors should propagate up so caller can reconnect
            error_msg = str(e).lower()
            if "connection" in error_msg and ("closed" in error_msg or "lost" in error_msg):
                raise DirectoryClientError(f"Connection lost: {e}") from e
            # Other errors (JSON parse, etc) - log and continue
            logger.warning(f"Error processing message: {e}")
            continue

    logger.trace(f"Collected {len(messages)} messages in {duration}s")
    return messages

Listen for messages for a specified duration.

This method collects all messages received within the specified duration. It properly handles connection closed errors by raising DirectoryClientError.

Args

duration
How long to listen in seconds

Returns

List of received messages

Raises

DirectoryClientError
If not connected or connection is lost
async def send_private_message(self, recipient: str, command: str, data: str) ‑> None
Expand source code
async def send_private_message(self, recipient: str, command: str, data: str) -> None:
    """
    Send a signed private message to a specific peer.

    JoinMarket requires all private messages to be signed with the sender's
    nick private key. The signature is appended to the message:
    Format: "!<command> <data> <pubkey_hex> <signature>"

    The message-to-sign is: data + hostid (to prevent replay attacks)
    Note: Only the data is signed, NOT the command prefix.

    Args:
        recipient: Target peer nick
        command: Command name (without ! prefix, e.g., 'fill', 'auth', 'tx')
        data: Command arguments to send (will be signed)
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    # Sign just the data (not the command) with our nick identity
    # Reference: rawmessage = ' '.join(message[1:].split(' ')[1:-2])
    # This means they extract [1:-2] which is the args, not the command
    # So we sign: data + hostid
    signed_data = self.nick_identity.sign_message(data, self.hostid)

    # JoinMarket message format: from_nick!to_nick!command <args>
    # The COMMAND_PREFIX ("!") is used ONLY as a field separator between
    # from_nick, to_nick, and the message content. The command itself
    # does NOT have a "!" prefix.
    # Format: "<command> <signed_data>" where signed_data = "<data> <pubkey_hex> <sig_b64>"
    full_message = f"{command} {signed_data}"

    privmsg = {
        "type": MessageType.PRIVMSG.value,
        "line": f"{self.nick}!{recipient}!{full_message}",
    }
    await self.connection.send(json.dumps(privmsg).encode("utf-8"))

Send a signed private message to a specific peer.

JoinMarket requires all private messages to be signed with the sender's nick private key. The signature is appended to the message: Format: "! "

The message-to-sign is: data + hostid (to prevent replay attacks) Note: Only the data is signed, NOT the command prefix.

Args

recipient
Target peer nick
command
Command name (without ! prefix, e.g., 'fill', 'auth', 'tx')
data
Command arguments to send (will be signed)
async def send_public_message(self, message: str) ‑> None
Expand source code
async def send_public_message(self, message: str) -> None:
    """
    Send a public message to all peers.

    Args:
        message: Message to broadcast
    """
    if not self.connection:
        raise DirectoryClientError("Not connected")

    pubmsg = {
        "type": MessageType.PUBMSG.value,
        "line": f"{self.nick}!PUBLIC!{message}",
    }
    await self.connection.send(json.dumps(pubmsg).encode("utf-8"))

Send a public message to all peers.

Args

message
Message to broadcast
def stop(self) ‑> None
Expand source code
def stop(self) -> None:
    """Stop continuous listening."""
    self.running = False

Stop continuous listening.

def supports_extended_utxo_format(self) ‑> bool
Expand source code
def supports_extended_utxo_format(self) -> bool:
    """
    Check if we should use extended UTXO format with this directory.

    Extended format (txid:vout:scriptpubkey:blockheight) is used when
    both sides advertise neutrino_compat feature. Protocol version
    is not checked - features are negotiated independently.

    Returns:
        True if extended UTXO format should be used
    """
    return self.neutrino_compat and self.directory_neutrino_compat

Check if we should use extended UTXO format with this directory.

Extended format (txid:vout:scriptpubkey:blockheight) is used when both sides advertise neutrino_compat feature. Protocol version is not checked - features are negotiated independently.

Returns

True if extended UTXO format should be used

class DirectoryClientError (*args, **kwargs)
Expand source code
class DirectoryClientError(Exception):
    """Error raised by DirectoryClient operations."""

Error raised by DirectoryClient operations.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class DirectoryServerConfig (**data: Any)
Expand source code
class DirectoryServerConfig(BaseModel):
    """
    Configuration for directory server instances.

    Used by standalone directory servers, not by clients.
    """

    network: NetworkType = Field(
        default=NetworkType.MAINNET, description="Network type for the directory server"
    )
    host: str = Field(default="127.0.0.1", description="Host address to bind to")
    port: int = Field(default=5222, ge=1, le=65535, description="Port to listen on")

    # Limits
    max_peers: int = Field(default=10000, ge=1, description="Maximum number of connected peers")
    max_message_size: int = Field(
        default=2097152, ge=1024, description="Maximum message size in bytes (default: 2MB)"
    )
    max_line_length: int = Field(
        default=65536, ge=1024, description="Maximum JSON-line message length (default: 64KB)"
    )
    max_json_nesting_depth: int = Field(
        default=10, ge=1, le=100, description="Maximum nesting depth for JSON parsing"
    )

    # Rate limiting
    # Higher limits to accommodate makers responding to orderbook requests
    # A single maker might send multiple offer messages + bond proofs rapidly
    message_rate_limit: int = Field(
        default=500, ge=1, description="Messages per second (sustained)"
    )
    message_burst_limit: int = Field(default=1000, ge=1, description="Maximum burst size")
    rate_limit_disconnect_threshold: int = Field(
        default=200, ge=1, description="Disconnect after N violations"
    )

    # Broadcasting
    broadcast_batch_size: int = Field(
        default=50,
        ge=1,
        description="Batch size for concurrent broadcasts (lower = less memory)",
    )

    # Logging
    log_level: str = Field(default="INFO", description="Logging level")

    # Server info
    motd: str = Field(
        default="JoinMarket Directory Server https://github.com/m0wer/joinmarket-ng",
        description="Message of the day sent to clients",
    )

    # Health check
    health_check_host: str = Field(
        default="127.0.0.1", description="Host for health check endpoint"
    )
    health_check_port: int = Field(
        default=8080, ge=1, le=65535, description="Port for health check endpoint"
    )

    model_config = {"frozen": False}

Configuration for directory server instances.

Used by standalone directory servers, not by clients.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var broadcast_batch_size : int

The type of the None singleton.

var health_check_host : str

The type of the None singleton.

var health_check_port : int

The type of the None singleton.

var host : str

The type of the None singleton.

var log_level : str

The type of the None singleton.

var max_json_nesting_depth : int

The type of the None singleton.

var max_line_length : int

The type of the None singleton.

var max_message_size : int

The type of the None singleton.

var max_peers : int

The type of the None singleton.

var message_burst_limit : int

The type of the None singleton.

var message_rate_limit : int

The type of the None singleton.

var model_config

The type of the None singleton.

var motd : str

The type of the None singleton.

var networkNetworkType

The type of the None singleton.

var port : int

The type of the None singleton.

var rate_limit_disconnect_threshold : int

The type of the None singleton.

class EphemeralHiddenService (service_id: str,
private_key: str | None = None,
ports: list[tuple[int, str]] | None = None)
Expand source code
class EphemeralHiddenService:
    """
    Represents an ephemeral hidden service created via Tor control port.

    Ephemeral hidden services are transient - they exist only while
    the control connection is open. When the connection closes,
    the hidden service is automatically removed.
    """

    def __init__(
        self,
        service_id: str,
        private_key: str | None = None,
        ports: list[tuple[int, str]] | None = None,
    ):
        """
        Initialize ephemeral hidden service info.

        Args:
            service_id: The .onion address without .onion suffix (56 chars for v3)
            private_key: Optional private key for recreating the service
            ports: List of (virtual_port, target) mappings
        """
        self.service_id = service_id
        self.private_key = private_key
        self.ports = ports or []

    @property
    def onion_address(self) -> str:
        """Get the full .onion address."""
        return f"{self.service_id}.onion"

    def __repr__(self) -> str:
        return f"EphemeralHiddenService({self.onion_address}, ports={self.ports})"

Represents an ephemeral hidden service created via Tor control port.

Ephemeral hidden services are transient - they exist only while the control connection is open. When the connection closes, the hidden service is automatically removed.

Initialize ephemeral hidden service info.

Args

service_id
The .onion address without .onion suffix (56 chars for v3)
private_key
Optional private key for recreating the service
ports
List of (virtual_port, target) mappings

Instance variables

prop onion_address : str
Expand source code
@property
def onion_address(self) -> str:
    """Get the full .onion address."""
    return f"{self.service_id}.onion"

Get the full .onion address.

class FeatureSet (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class FeatureSet:
    """
    Represents a set of protocol features advertised by a peer.

    Used for feature negotiation during handshake and CoinJoin sessions.
    """

    features: set[str] = Field(default_factory=set)

    @classmethod
    def from_handshake(cls, handshake_data: dict[str, Any]) -> FeatureSet:
        """Extract features from a handshake payload."""
        features_dict = handshake_data.get("features", {})
        # Only include features that are set to True
        features = {k for k, v in features_dict.items() if v is True}
        return cls(features=features)

    @classmethod
    def from_list(cls, feature_list: list[str]) -> FeatureSet:
        """Create from a list of feature names."""
        return cls(features=set(feature_list))

    @classmethod
    def from_comma_string(cls, s: str) -> FeatureSet:
        """Parse from plus-separated string (e.g., 'neutrino_compat+push_encrypted').

        Note: Despite the method name, uses '+' as separator because the peerlist
        itself uses ',' to separate entries. The name is kept for backward compatibility.
        Also accepts ',' for legacy/handshake use cases.
        """
        if not s or not s.strip():
            return cls(features=set())
        # Support both + (peerlist) and , (legacy/handshake) separators
        if "+" in s:
            return cls(features={f.strip() for f in s.split("+") if f.strip()})
        return cls(features={f.strip() for f in s.split(",") if f.strip()})

    def to_dict(self) -> dict[str, bool]:
        """Convert to dict for JSON serialization."""
        return dict.fromkeys(sorted(self.features), True)

    def to_comma_string(self) -> str:
        """Convert to plus-separated string for peerlist F: suffix.

        Note: Uses '+' as separator instead of ',' because the peerlist
        itself uses ',' to separate entries. Using ',' for features would
        cause parsing ambiguity.
        """
        return "+".join(sorted(self.features))

    def supports(self, feature: str) -> bool:
        """Check if this set includes a specific feature."""
        return feature in self.features

    def supports_neutrino_compat(self) -> bool:
        """Check if neutrino_compat is supported."""
        return FEATURE_NEUTRINO_COMPAT in self.features

    def supports_push_encrypted(self) -> bool:
        """Check if push_encrypted is supported."""
        return FEATURE_PUSH_ENCRYPTED in self.features

    def supports_peerlist_features(self) -> bool:
        """Check if peer supports extended peerlist with features (F: suffix)."""
        return FEATURE_PEERLIST_FEATURES in self.features

    def validate_dependencies(self) -> tuple[bool, str]:
        """Check that all feature dependencies are satisfied."""
        for feature in self.features:
            deps = FEATURE_DEPENDENCIES.get(feature, [])
            for dep in deps:
                if dep not in self.features:
                    return False, f"Feature '{feature}' requires '{dep}'"
        return True, ""

    def intersection(self, other: FeatureSet) -> FeatureSet:
        """Return features supported by both sets."""
        return FeatureSet(features=self.features & other.features)

    def __bool__(self) -> bool:
        """True if any features are set."""
        return bool(self.features)

    def __contains__(self, feature: str) -> bool:
        return feature in self.features

    def __iter__(self):
        return iter(self.features)

    def __len__(self) -> int:
        return len(self.features)

Represents a set of protocol features advertised by a peer.

Used for feature negotiation during handshake and CoinJoin sessions.

Static methods

def from_comma_string(s: str) ‑> FeatureSet

Parse from plus-separated string (e.g., 'neutrino_compat+push_encrypted').

Note: Despite the method name, uses '+' as separator because the peerlist itself uses ',' to separate entries. The name is kept for backward compatibility. Also accepts ',' for legacy/handshake use cases.

def from_handshake(handshake_data: dict[str, Any]) ‑> FeatureSet

Extract features from a handshake payload.

def from_list(feature_list: list[str]) ‑> FeatureSet

Create from a list of feature names.

Instance variables

var features : set[str]

The type of the None singleton.

Methods

def intersection(self,
other: FeatureSet) ‑> FeatureSet
Expand source code
def intersection(self, other: FeatureSet) -> FeatureSet:
    """Return features supported by both sets."""
    return FeatureSet(features=self.features & other.features)

Return features supported by both sets.

def supports(self, feature: str) ‑> bool
Expand source code
def supports(self, feature: str) -> bool:
    """Check if this set includes a specific feature."""
    return feature in self.features

Check if this set includes a specific feature.

def supports_neutrino_compat(self) ‑> bool
Expand source code
def supports_neutrino_compat(self) -> bool:
    """Check if neutrino_compat is supported."""
    return FEATURE_NEUTRINO_COMPAT in self.features

Check if neutrino_compat is supported.

def supports_peerlist_features(self) ‑> bool
Expand source code
def supports_peerlist_features(self) -> bool:
    """Check if peer supports extended peerlist with features (F: suffix)."""
    return FEATURE_PEERLIST_FEATURES in self.features

Check if peer supports extended peerlist with features (F: suffix).

def supports_push_encrypted(self) ‑> bool
Expand source code
def supports_push_encrypted(self) -> bool:
    """Check if push_encrypted is supported."""
    return FEATURE_PUSH_ENCRYPTED in self.features

Check if push_encrypted is supported.

def to_comma_string(self) ‑> str
Expand source code
def to_comma_string(self) -> str:
    """Convert to plus-separated string for peerlist F: suffix.

    Note: Uses '+' as separator instead of ',' because the peerlist
    itself uses ',' to separate entries. Using ',' for features would
    cause parsing ambiguity.
    """
    return "+".join(sorted(self.features))

Convert to plus-separated string for peerlist F: suffix.

Note: Uses '+' as separator instead of ',' because the peerlist itself uses ',' to separate entries. Using ',' for features would cause parsing ambiguity.

def to_dict(self) ‑> dict[str, bool]
Expand source code
def to_dict(self) -> dict[str, bool]:
    """Convert to dict for JSON serialization."""
    return dict.fromkeys(sorted(self.features), True)

Convert to dict for JSON serialization.

def validate_dependencies(self) ‑> tuple[bool, str]
Expand source code
def validate_dependencies(self) -> tuple[bool, str]:
    """Check that all feature dependencies are satisfied."""
    for feature in self.features:
        deps = FEATURE_DEPENDENCIES.get(feature, [])
        for dep in deps:
            if dep not in self.features:
                return False, f"Feature '{feature}' requires '{dep}'"
    return True, ""

Check that all feature dependencies are satisfied.

class MessageEnvelope (**data: Any)
Expand source code
class MessageEnvelope(BaseModel):
    message_type: int = Field(..., ge=0)
    payload: str
    timestamp: datetime = Field(default_factory=lambda: datetime.now(UTC))

    def to_bytes(self) -> bytes:
        import json

        result = json.dumps({"type": self.message_type, "line": self.payload}).encode("utf-8")
        return result

    @classmethod
    def from_bytes(
        cls, data: bytes, max_line_length: int = 65536, max_json_nesting_depth: int = 10
    ) -> MessageEnvelope:
        """
        Parse a message envelope from bytes with security limits.

        Args:
            data: Raw message bytes (without \\r\\n terminator)
            max_line_length: Maximum allowed line length in bytes (default 64KB)
            max_json_nesting_depth: Maximum JSON nesting depth (default 10)

        Returns:
            Parsed MessageEnvelope

        Raises:
            MessageParsingError: If message exceeds security limits
            json.JSONDecodeError: If JSON is malformed
        """
        import json

        # Check line length BEFORE parsing to prevent DoS
        if len(data) > max_line_length:
            raise MessageParsingError(
                f"Message line length {len(data)} exceeds maximum of {max_line_length} bytes"
            )

        # Parse JSON
        obj = json.loads(data)

        # Validate nesting depth BEFORE creating model
        validate_json_nesting_depth(obj, max_json_nesting_depth)

        return cls(message_type=obj["type"], payload=obj["line"])

Usage Documentation

Models

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var message_type : int

The type of the None singleton.

var model_config

The type of the None singleton.

var payload : str

The type of the None singleton.

var timestamp : datetime.datetime

The type of the None singleton.

Static methods

def from_bytes(data: bytes, max_line_length: int = 65536, max_json_nesting_depth: int = 10) ‑> MessageEnvelope

Parse a message envelope from bytes with security limits.

Args

data
Raw message bytes (without \r\n terminator)
max_line_length
Maximum allowed line length in bytes (default 64KB)
max_json_nesting_depth
Maximum JSON nesting depth (default 10)

Returns

Parsed MessageEnvelope

Raises

MessageParsingError
If message exceeds security limits
json.JSONDecodeError
If JSON is malformed

Methods

def to_bytes(self) ‑> bytes
Expand source code
def to_bytes(self) -> bytes:
    import json

    result = json.dumps({"type": self.message_type, "line": self.payload}).encode("utf-8")
    return result
class MessageType (*values)
Expand source code
class MessageType(IntEnum):
    PRIVMSG = 685
    PUBMSG = 687
    PEERLIST = 789
    GETPEERLIST = 791
    HANDSHAKE = 793
    DN_HANDSHAKE = 795
    PING = 797
    PONG = 799
    DISCONNECT = 801

    CONNECT = 785
    CONNECT_IN = 797

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var CONNECT

The type of the None singleton.

var CONNECT_IN

The type of the None singleton.

var DISCONNECT

The type of the None singleton.

var DN_HANDSHAKE

The type of the None singleton.

var GETPEERLIST

The type of the None singleton.

var HANDSHAKE

The type of the None singleton.

var PEERLIST

The type of the None singleton.

var PING

The type of the None singleton.

var PONG

The type of the None singleton.

var PRIVMSG

The type of the None singleton.

var PUBMSG

The type of the None singleton.

class NaclError (*args, **kwargs)
Expand source code
class NaclError(Exception):
    """Exception for NaCl encryption errors."""

    pass

Exception for NaCl encryption errors.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ParsedTransaction (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class ParsedTransaction:
    """Parsed Bitcoin transaction."""

    version: int
    inputs: list[dict[str, Any]]
    outputs: list[dict[str, Any]]
    witnesses: list[list[bytes]]
    locktime: int
    has_witness: bool

Parsed Bitcoin transaction.

Instance variables

var has_witness : bool

The type of the None singleton.

var inputs : list[dict[str, typing.Any]]

The type of the None singleton.

var locktime : int

The type of the None singleton.

var outputs : list[dict[str, typing.Any]]

The type of the None singleton.

var version : int

The type of the None singleton.

var witnesses : list[list[bytes]]

The type of the None singleton.

class PeerInfo (**data: Any)
Expand source code
class PeerInfo(BaseModel):
    nick: str = Field(..., min_length=1, max_length=64)
    onion_address: str = Field(..., pattern=r"^[a-z2-7]{56}\.onion$|^NOT-SERVING-ONION$")
    port: int = Field(..., ge=-1, le=65535)
    status: PeerStatus = PeerStatus.UNCONNECTED
    is_directory: bool = False
    network: NetworkType = NetworkType.MAINNET
    last_seen: datetime | None = None
    features: dict[str, Any] = Field(default_factory=dict)
    protocol_version: int = Field(default=5, ge=5, le=10)  # Negotiated protocol version
    neutrino_compat: bool = False  # True if peer supports extended UTXO metadata

    @field_validator("onion_address")
    @classmethod
    def validate_onion(cls, v: str) -> str:
        if v == "NOT-SERVING-ONION":
            return v
        if not v.endswith(".onion"):
            raise ValueError("Invalid onion address")
        return v

    @field_validator("port")
    @classmethod
    def validate_port(cls, v: int, info) -> int:
        if v == -1 and info.data.get("onion_address") == "NOT-SERVING-ONION":
            return v
        if v < 1 or v > 65535:
            raise ValueError("Port must be between 1 and 65535")
        return v

    @cached_property
    def location_string(self) -> str:
        if self.onion_address == "NOT-SERVING-ONION":
            return "NOT-SERVING-ONION"
        return f"{self.onion_address}:{self.port}"

    def supports_extended_utxo(self) -> bool:
        """Check if this peer supports extended UTXO format (neutrino_compat)."""
        # With feature-based detection, we check the neutrino_compat flag
        # which is set from the features dict during handshake
        return self.neutrino_compat

    model_config = {"frozen": False}

Usage Documentation

Models

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var features : dict[str, typing.Any]

The type of the None singleton.

var is_directory : bool

The type of the None singleton.

var last_seen : datetime.datetime | None

The type of the None singleton.

var model_config

The type of the None singleton.

var networkNetworkType

The type of the None singleton.

var neutrino_compat : bool

The type of the None singleton.

var nick : str

The type of the None singleton.

var onion_address : str

The type of the None singleton.

var port : int

The type of the None singleton.

var protocol_version : int

The type of the None singleton.

var statusPeerStatus

The type of the None singleton.

Static methods

def validate_onion(v: str) ‑> str
def validate_port(v: int, info) ‑> int

Instance variables

var location_string : str
Expand source code
@cached_property
def location_string(self) -> str:
    if self.onion_address == "NOT-SERVING-ONION":
        return "NOT-SERVING-ONION"
    return f"{self.onion_address}:{self.port}"

Methods

def supports_extended_utxo(self) ‑> bool
Expand source code
def supports_extended_utxo(self) -> bool:
    """Check if this peer supports extended UTXO format (neutrino_compat)."""
    # With feature-based detection, we check the neutrino_compat flag
    # which is set from the features dict during handshake
    return self.neutrino_compat

Check if this peer supports extended UTXO format (neutrino_compat).

class PoDLECommitment (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class PoDLECommitment:
    """PoDLE commitment data generated by taker."""

    commitment: bytes  # H(P2) - 32 bytes
    p: bytes  # Public key P = k*G - 33 bytes compressed
    p2: bytes  # Commitment point P2 = k*J - 33 bytes compressed
    sig: bytes  # Schnorr signature s - 32 bytes
    e: bytes  # Challenge e - 32 bytes
    utxo: str  # UTXO reference "txid:vout"
    index: int  # NUMS point index used

    def to_revelation(self) -> dict[str, str]:
        """Convert to revelation format for sending to maker."""
        return {
            "P": self.p.hex(),
            "P2": self.p2.hex(),
            "sig": self.sig.hex(),
            "e": self.e.hex(),
            "utxo": self.utxo,
        }

    def to_commitment_str(self) -> str:
        """
        Get commitment as string with type prefix.

        JoinMarket requires a commitment type prefix to allow future
        commitment schemes. "P" indicates a standard PoDLE commitment.
        Format: "P" + hex(commitment)
        """
        return "P" + self.commitment.hex()

PoDLE commitment data generated by taker.

Instance variables

var commitment : bytes

The type of the None singleton.

var e : bytes

The type of the None singleton.

var index : int

The type of the None singleton.

var p : bytes

The type of the None singleton.

var p2 : bytes

The type of the None singleton.

var sig : bytes

The type of the None singleton.

var utxo : str

The type of the None singleton.

Methods

def to_commitment_str(self) ‑> str
Expand source code
def to_commitment_str(self) -> str:
    """
    Get commitment as string with type prefix.

    JoinMarket requires a commitment type prefix to allow future
    commitment schemes. "P" indicates a standard PoDLE commitment.
    Format: "P" + hex(commitment)
    """
    return "P" + self.commitment.hex()

Get commitment as string with type prefix.

JoinMarket requires a commitment type prefix to allow future commitment schemes. "P" indicates a standard PoDLE commitment. Format: "P" + hex(commitment)

def to_revelation(self) ‑> dict[str, str]
Expand source code
def to_revelation(self) -> dict[str, str]:
    """Convert to revelation format for sending to maker."""
    return {
        "P": self.p.hex(),
        "P2": self.p2.hex(),
        "sig": self.sig.hex(),
        "e": self.e.hex(),
        "utxo": self.utxo,
    }

Convert to revelation format for sending to maker.

class PoDLEError (*args, **kwargs)
Expand source code
class PoDLEError(Exception):
    """PoDLE generation or verification error."""

    pass

PoDLE generation or verification error.

Ancestors

  • builtins.Exception
  • builtins.BaseException
class ProtocolMessage (**data: Any)
Expand source code
class ProtocolMessage(BaseModel):
    type: MessageType
    payload: dict[str, Any]

    def to_json(self) -> str:
        return json.dumps({"type": self.type.value, "data": self.payload})

    @classmethod
    def from_json(cls, data: str) -> ProtocolMessage:
        obj = json.loads(data)
        return cls(type=MessageType(obj["type"]), payload=obj["data"])

    def to_bytes(self) -> bytes:
        return self.to_json().encode("utf-8")

    @classmethod
    def from_bytes(cls, data: bytes) -> ProtocolMessage:
        return cls.from_json(data.decode("utf-8"))

Usage Documentation

Models

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var model_config

The type of the None singleton.

var payload : dict[str, typing.Any]

The type of the None singleton.

var typeMessageType

The type of the None singleton.

Static methods

def from_bytes(data: bytes) ‑> ProtocolMessage
def from_json(data: str) ‑> ProtocolMessage

Methods

def to_bytes(self) ‑> bytes
Expand source code
def to_bytes(self) -> bytes:
    return self.to_json().encode("utf-8")
def to_json(self) ‑> str
Expand source code
def to_json(self) -> str:
    return json.dumps({"type": self.type.value, "data": self.payload})
class RateLimiter (rate_limit: int = 10,
burst_limit: int | None = None,
disconnect_threshold: int | None = None)
Expand source code
class RateLimiter:
    """
    Per-peer rate limiter using token bucket algorithm.

    Configuration:
    - rate_limit: messages per second (sustained rate)
    - burst_limit: maximum burst size (default: 10x rate_limit)
    - disconnect_threshold: violations before disconnect (default: None = never)

    Default settings (10 msg/sec sustained, 100 msg burst):
    - Allows ~10 seconds of continuous max-rate traffic before throttling
    - Prevents DoS from rapid spam while allowing legitimate burst patterns
    - Example: taker requesting orderbook from multiple makers simultaneously

    Security:
    - Rate limit by connection ID, not self-declared nick, to prevent impersonation
    - Nick spoofing attack: attacker claims victim's nick to get them rate limited
    - Use connection-based keys until identity is cryptographically verified
    """

    @validate_call
    def __init__(
        self,
        rate_limit: int = 10,
        burst_limit: int | None = None,
        disconnect_threshold: int | None = None,
    ):
        """
        Initialize rate limiter.

        Args:
            rate_limit: Maximum messages per second (sustained, default: 10)
            burst_limit: Maximum burst size (default: 10x rate_limit = 100)
            disconnect_threshold: Violations before disconnect (None = never disconnect)
        """
        self.rate_limit = rate_limit
        self.burst_limit = burst_limit or (rate_limit * 10)
        self.disconnect_threshold = disconnect_threshold
        self._buckets: dict[str, TokenBucket] = {}
        self._violation_counts: dict[str, int] = {}

    def check(self, peer_key: str) -> tuple[RateLimitAction, float]:
        """
        Check rate limit and return recommended action.

        Returns:
            Tuple of (action, delay_seconds):
            - ALLOW: Message allowed, delay=0
            - DELAY: Message should be delayed/dropped, delay=recommended wait time
            - DISCONNECT: Peer should be disconnected (severe abuse), delay=0
        """
        if peer_key not in self._buckets:
            self._buckets[peer_key] = TokenBucket(
                capacity=self.burst_limit,
                refill_rate=float(self.rate_limit),
            )

        bucket = self._buckets[peer_key]
        allowed = bucket.consume()

        if allowed:
            return (RateLimitAction.ALLOW, 0.0)

        # Rate limited - increment violation count
        self._violation_counts[peer_key] = self._violation_counts.get(peer_key, 0) + 1
        violations = self._violation_counts[peer_key]

        # Check if we should disconnect (only if threshold is set)
        if self.disconnect_threshold is not None and violations >= self.disconnect_threshold:
            return (RateLimitAction.DISCONNECT, 0.0)

        # Otherwise, recommend delay
        delay = bucket.get_delay_seconds()
        return (RateLimitAction.DELAY, delay)

    def remove_peer(self, peer_key: str) -> None:
        """Remove rate limit state for a disconnected peer."""
        self._buckets.pop(peer_key, None)
        self._violation_counts.pop(peer_key, None)

    def get_violation_count(self, peer_key: str) -> int:
        """Get the number of rate limit violations for a peer."""
        return self._violation_counts.get(peer_key, 0)

    def get_delay_for_peer(self, peer_key: str) -> float:
        """Get recommended delay in seconds for a rate-limited peer."""
        bucket = self._buckets.get(peer_key)
        if bucket is None:
            return 0.0
        return bucket.get_delay_seconds()

    def cleanup_old_peers(self, max_idle_seconds: float = 3600.0) -> int:
        """
        Remove peers that haven't sent messages in max_idle_seconds.

        Returns the number of peers removed.
        """
        now = time.monotonic()
        stale_peers = [
            peer_key
            for peer_key, bucket in self._buckets.items()
            if now - bucket.last_refill > max_idle_seconds
        ]

        for peer_key in stale_peers:
            self.remove_peer(peer_key)

        return len(stale_peers)

    def get_stats(self) -> dict:
        """Get rate limiter statistics."""
        return {
            "tracked_peers": len(self._buckets),
            "total_violations": sum(self._violation_counts.values()),
            "top_violators": sorted(
                self._violation_counts.items(),
                key=lambda x: x[1],
                reverse=True,
            )[:10],
        }

    def clear(self) -> None:
        """Clear all rate limit state."""
        self._buckets.clear()
        self._violation_counts.clear()

Per-peer rate limiter using token bucket algorithm.

Configuration: - rate_limit: messages per second (sustained rate) - burst_limit: maximum burst size (default: 10x rate_limit) - disconnect_threshold: violations before disconnect (default: None = never)

Default settings (10 msg/sec sustained, 100 msg burst): - Allows ~10 seconds of continuous max-rate traffic before throttling - Prevents DoS from rapid spam while allowing legitimate burst patterns - Example: taker requesting orderbook from multiple makers simultaneously

Security: - Rate limit by connection ID, not self-declared nick, to prevent impersonation - Nick spoofing attack: attacker claims victim's nick to get them rate limited - Use connection-based keys until identity is cryptographically verified

Initialize rate limiter.

Args

rate_limit
Maximum messages per second (sustained, default: 10)
burst_limit
Maximum burst size (default: 10x rate_limit = 100)
disconnect_threshold
Violations before disconnect (None = never disconnect)

Methods

def check(self, peer_key: str) ‑> tuple[RateLimitAction, float]
Expand source code
def check(self, peer_key: str) -> tuple[RateLimitAction, float]:
    """
    Check rate limit and return recommended action.

    Returns:
        Tuple of (action, delay_seconds):
        - ALLOW: Message allowed, delay=0
        - DELAY: Message should be delayed/dropped, delay=recommended wait time
        - DISCONNECT: Peer should be disconnected (severe abuse), delay=0
    """
    if peer_key not in self._buckets:
        self._buckets[peer_key] = TokenBucket(
            capacity=self.burst_limit,
            refill_rate=float(self.rate_limit),
        )

    bucket = self._buckets[peer_key]
    allowed = bucket.consume()

    if allowed:
        return (RateLimitAction.ALLOW, 0.0)

    # Rate limited - increment violation count
    self._violation_counts[peer_key] = self._violation_counts.get(peer_key, 0) + 1
    violations = self._violation_counts[peer_key]

    # Check if we should disconnect (only if threshold is set)
    if self.disconnect_threshold is not None and violations >= self.disconnect_threshold:
        return (RateLimitAction.DISCONNECT, 0.0)

    # Otherwise, recommend delay
    delay = bucket.get_delay_seconds()
    return (RateLimitAction.DELAY, delay)

Check rate limit and return recommended action.

Returns

Tuple of (action, delay_seconds): - ALLOW: Message allowed, delay=0 - DELAY: Message should be delayed/dropped, delay=recommended wait time - DISCONNECT: Peer should be disconnected (severe abuse), delay=0

def cleanup_old_peers(self, max_idle_seconds: float = 3600.0) ‑> int
Expand source code
def cleanup_old_peers(self, max_idle_seconds: float = 3600.0) -> int:
    """
    Remove peers that haven't sent messages in max_idle_seconds.

    Returns the number of peers removed.
    """
    now = time.monotonic()
    stale_peers = [
        peer_key
        for peer_key, bucket in self._buckets.items()
        if now - bucket.last_refill > max_idle_seconds
    ]

    for peer_key in stale_peers:
        self.remove_peer(peer_key)

    return len(stale_peers)

Remove peers that haven't sent messages in max_idle_seconds.

Returns the number of peers removed.

def clear(self) ‑> None
Expand source code
def clear(self) -> None:
    """Clear all rate limit state."""
    self._buckets.clear()
    self._violation_counts.clear()

Clear all rate limit state.

def get_delay_for_peer(self, peer_key: str) ‑> float
Expand source code
def get_delay_for_peer(self, peer_key: str) -> float:
    """Get recommended delay in seconds for a rate-limited peer."""
    bucket = self._buckets.get(peer_key)
    if bucket is None:
        return 0.0
    return bucket.get_delay_seconds()

Get recommended delay in seconds for a rate-limited peer.

def get_stats(self) ‑> dict
Expand source code
def get_stats(self) -> dict:
    """Get rate limiter statistics."""
    return {
        "tracked_peers": len(self._buckets),
        "total_violations": sum(self._violation_counts.values()),
        "top_violators": sorted(
            self._violation_counts.items(),
            key=lambda x: x[1],
            reverse=True,
        )[:10],
    }

Get rate limiter statistics.

def get_violation_count(self, peer_key: str) ‑> int
Expand source code
def get_violation_count(self, peer_key: str) -> int:
    """Get the number of rate limit violations for a peer."""
    return self._violation_counts.get(peer_key, 0)

Get the number of rate limit violations for a peer.

def remove_peer(self, peer_key: str) ‑> None
Expand source code
def remove_peer(self, peer_key: str) -> None:
    """Remove rate limit state for a disconnected peer."""
    self._buckets.pop(peer_key, None)
    self._violation_counts.pop(peer_key, None)

Remove rate limit state for a disconnected peer.

class RequiredFeatures (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class RequiredFeatures:
    """
    Features that this peer requires from counterparties.

    Used to filter incompatible peers during maker selection.
    """

    required: set[str] = Field(default_factory=set)

    @classmethod
    def for_neutrino_taker(cls) -> RequiredFeatures:
        """Create requirements for a taker using Neutrino backend."""
        return cls(required={FEATURE_NEUTRINO_COMPAT})

    @classmethod
    def none(cls) -> RequiredFeatures:
        """No required features."""
        return cls(required=set())

    def is_compatible(self, peer_features: FeatureSet) -> tuple[bool, str]:
        """Check if peer supports all required features."""
        missing = self.required - peer_features.features
        if missing:
            return False, f"Missing required features: {missing}"
        return True, ""

    def __bool__(self) -> bool:
        return bool(self.required)

Features that this peer requires from counterparties.

Used to filter incompatible peers during maker selection.

Static methods

def for_neutrino_taker() ‑> RequiredFeatures

Create requirements for a taker using Neutrino backend.

def none() ‑> RequiredFeatures

No required features.

Instance variables

var required : set[str]

The type of the None singleton.

Methods

def is_compatible(self,
peer_features: FeatureSet) ‑> tuple[bool, str]
Expand source code
def is_compatible(self, peer_features: FeatureSet) -> tuple[bool, str]:
    """Check if peer supports all required features."""
    missing = self.required - peer_features.features
    if missing:
        return False, f"Missing required features: {missing}"
    return True, ""

Check if peer supports all required features.

class TokenBucket (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class TokenBucket:
    """
    Token bucket for rate limiting.

    Tokens are added at a fixed rate up to a maximum capacity.
    Each message consumes one token. If no tokens are available,
    the message is rejected.
    """

    capacity: int  # Maximum tokens (burst allowance)
    refill_rate: float  # Tokens per second
    tokens: float = Field(init=False)
    last_refill: float = Field(init=False)

    def __post_init__(self) -> None:
        self.tokens = float(self.capacity)
        self.last_refill = time.monotonic()

    def consume(self, tokens: int = 1) -> bool:
        """
        Try to consume tokens. Returns True if successful, False if rate limited.
        """
        now = time.monotonic()
        elapsed = now - self.last_refill
        self.last_refill = now

        # Refill tokens based on elapsed time
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)

        if self.tokens >= tokens:
            self.tokens -= tokens
            return True
        return False

    def get_delay_seconds(self) -> float:
        """
        Get recommended delay in seconds before next message would be allowed.
        Returns 0 if tokens are available.
        """
        if self.tokens >= 1:
            return 0.0
        # Calculate time needed to refill 1 token
        tokens_needed = 1 - self.tokens
        return tokens_needed / self.refill_rate

    def reset(self) -> None:
        """Reset bucket to full capacity."""
        self.tokens = float(self.capacity)
        self.last_refill = time.monotonic()

Token bucket for rate limiting.

Tokens are added at a fixed rate up to a maximum capacity. Each message consumes one token. If no tokens are available, the message is rejected.

Instance variables

var capacity : int

The type of the None singleton.

var last_refill : float

The type of the None singleton.

var refill_rate : float

The type of the None singleton.

var tokens : float

The type of the None singleton.

Methods

def consume(self, tokens: int = 1) ‑> bool
Expand source code
def consume(self, tokens: int = 1) -> bool:
    """
    Try to consume tokens. Returns True if successful, False if rate limited.
    """
    now = time.monotonic()
    elapsed = now - self.last_refill
    self.last_refill = now

    # Refill tokens based on elapsed time
    self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)

    if self.tokens >= tokens:
        self.tokens -= tokens
        return True
    return False

Try to consume tokens. Returns True if successful, False if rate limited.

def get_delay_seconds(self) ‑> float
Expand source code
def get_delay_seconds(self) -> float:
    """
    Get recommended delay in seconds before next message would be allowed.
    Returns 0 if tokens are available.
    """
    if self.tokens >= 1:
        return 0.0
    # Calculate time needed to refill 1 token
    tokens_needed = 1 - self.tokens
    return tokens_needed / self.refill_rate

Get recommended delay in seconds before next message would be allowed. Returns 0 if tokens are available.

def reset(self) ‑> None
Expand source code
def reset(self) -> None:
    """Reset bucket to full capacity."""
    self.tokens = float(self.capacity)
    self.last_refill = time.monotonic()

Reset bucket to full capacity.

class TorAuthenticationError (*args, **kwargs)
Expand source code
class TorAuthenticationError(TorControlError):
    """Authentication with Tor control port failed."""

    pass

Authentication with Tor control port failed.

Ancestors

class TorConfig (**data: Any)
Expand source code
class TorConfig(BaseModel):
    """
    Configuration for Tor SOCKS proxy connection.

    Used for outgoing connections to directory servers and peers.
    """

    socks_host: str = Field(default="127.0.0.1", description="Tor SOCKS5 proxy host address")
    socks_port: int = Field(default=9050, ge=1, le=65535, description="Tor SOCKS5 proxy port")

    model_config = {"frozen": False}

Configuration for Tor SOCKS proxy connection.

Used for outgoing connections to directory servers and peers.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var model_config

The type of the None singleton.

var socks_host : str

The type of the None singleton.

var socks_port : int

The type of the None singleton.

class TorControlClient (control_host: str = '127.0.0.1',
control_port: int = 9051,
cookie_path: str | Path | None = None,
password: str | None = None)
Expand source code
class TorControlClient:
    """
    Async client for Tor control protocol.

    Supports cookie authentication and ephemeral hidden service creation.
    The client maintains a persistent connection to control port.

    Example:
        async with TorControlClient() as client:
            hs = await client.create_ephemeral_hidden_service(
                ports=[(8765, "127.0.0.1:8765")]
            )
            print(f"Hidden service: {hs.onion_address}")
            # Service exists while connection is open
        # Service removed when context exits
    """

    def __init__(
        self,
        control_host: str = "127.0.0.1",
        control_port: int = 9051,
        cookie_path: str | Path | None = None,
        password: str | None = None,
    ):
        """
        Initialize Tor control client.

        Args:
            control_host: Tor control port host
            control_port: Tor control port number
            cookie_path: Path to cookie auth file (usually /var/lib/tor/control_auth_cookie)
            password: Optional password for HASHEDPASSWORD auth (not recommended)
        """
        self.control_host = control_host
        self.control_port = control_port
        self.cookie_path = Path(cookie_path) if cookie_path else None
        self.password = password

        self._reader: asyncio.StreamReader | None = None
        self._writer: asyncio.StreamWriter | None = None
        self._connected = False
        self._authenticated = False
        self._read_lock = asyncio.Lock()
        self._write_lock = asyncio.Lock()

        # Track created hidden services for cleanup
        self._hidden_services: list[EphemeralHiddenService] = []

    async def __aenter__(self) -> TorControlClient:
        """Async context manager entry - connect and authenticate."""
        await self.connect()
        await self.authenticate()
        return self

    async def __aexit__(
        self,
        exc_type: type[BaseException] | None,
        exc_val: BaseException | None,
        exc_tb: object,
    ) -> None:
        """Async context manager exit - close connection."""
        await self.close()

    async def connect(self) -> None:
        """Connect to Tor control port."""
        if self._connected:
            return

        try:
            logger.debug(f"Connecting to Tor control port {self.control_host}:{self.control_port}")
            self._reader, self._writer = await asyncio.wait_for(
                asyncio.open_connection(self.control_host, self.control_port),
                timeout=10.0,
            )
            self._connected = True
            logger.info(f"Connected to Tor control port at {self.control_host}:{self.control_port}")
        except TimeoutError as e:
            raise TorControlError(
                f"Timeout connecting to Tor control port at {self.control_host}:{self.control_port}"
            ) from e
        except OSError as e:
            raise TorControlError(
                f"Failed to connect to Tor control port at "
                f"{self.control_host}:{self.control_port}: {e}"
            ) from e

    async def close(self) -> None:
        """Close connection to Tor control port."""
        if not self._connected:
            return

        self._connected = False
        self._authenticated = False
        self._hidden_services.clear()

        if self._writer:
            try:
                self._writer.close()
                await self._writer.wait_closed()
            except Exception:
                pass
            self._writer = None
        self._reader = None

        logger.debug("Closed Tor control connection")

    async def _send_command(self, command: str) -> None:
        """Send a command to Tor control port."""
        if not self._connected or not self._writer:
            raise TorControlError("Not connected to Tor control port")

        async with self._write_lock:
            logger.trace(f"Tor control send: {command}")
            self._writer.write(f"{command}\r\n".encode())
            await self._writer.drain()

    async def _read_response(self) -> list[tuple[str, str, str]]:
        """
        Read response from Tor control port.

        Returns:
            List of (status_code, separator, message) tuples.
            Separator is '-' for multi-line, ' ' for last/single line, '+' for data.
        """
        if not self._connected or not self._reader:
            raise TorControlError("Not connected to Tor control port")

        responses: list[tuple[str, str, str]] = []

        async with self._read_lock:
            while True:
                try:
                    line = await asyncio.wait_for(self._reader.readline(), timeout=30.0)
                except TimeoutError as e:
                    raise TorControlError("Timeout reading from Tor control port") from e

                if not line:
                    raise TorControlError("Connection closed by Tor")

                line_str = line.decode("utf-8").rstrip("\r\n")
                logger.trace(f"Tor control recv: {line_str}")

                if len(line_str) < 4:
                    raise TorControlError(f"Invalid response format: {line_str}")

                status_code = line_str[:3]
                separator = line_str[3]
                message = line_str[4:]

                responses.append((status_code, separator, message))

                # Handle multi-line data responses (status+data)
                if separator == "+":
                    # Read until we see a line with just "."
                    data_lines: list[str] = []
                    while True:
                        data_line = await self._reader.readline()
                        data_str = data_line.decode("utf-8").rstrip("\r\n")
                        if data_str == ".":
                            break
                        data_lines.append(data_str)
                    # Store data as message content
                    responses[-1] = (status_code, separator, "\n".join(data_lines))

                # Single line or last line of multi-line response
                if separator == " ":
                    break

        return responses

    async def _command(self, command: str) -> list[tuple[str, str, str]]:
        """Send command and read response."""
        await self._send_command(command)
        return await self._read_response()

    def _check_success(
        self, responses: list[tuple[str, str, str]], expected_code: str = "250"
    ) -> None:
        """Check if response indicates success."""
        if not responses:
            raise TorControlError("Empty response from Tor")

        # Check the last response (final status)
        status_code, _, message = responses[-1]
        if status_code != expected_code:
            raise TorControlError(f"Tor command failed: {status_code} {message}")

    async def authenticate(self) -> None:
        """
        Authenticate with Tor control port.

        Tries cookie authentication first if cookie_path is set,
        then falls back to password if provided.
        """
        if self._authenticated:
            return

        if not self._connected:
            await self.connect()

        # Try cookie authentication
        if self.cookie_path:
            await self._authenticate_cookie()
            return

        # Try password authentication
        if self.password:
            await self._authenticate_password()
            return

        # Try null authentication (for permissive configs)
        try:
            responses = await self._command("AUTHENTICATE")
            self._check_success(responses)
            self._authenticated = True
            logger.info("Authenticated with Tor (null auth)")
        except TorControlError as e:
            raise TorAuthenticationError(
                "No authentication method configured. Provide cookie_path or password."
            ) from e

    async def _authenticate_cookie(self) -> None:
        """Authenticate using cookie file."""
        if not self.cookie_path:
            raise TorAuthenticationError("Cookie path not configured")

        try:
            cookie_data = self.cookie_path.read_bytes()
            cookie_hex = cookie_data.hex()
        except FileNotFoundError as e:
            raise TorAuthenticationError(f"Cookie file not found: {self.cookie_path}") from e
        except PermissionError as e:
            raise TorAuthenticationError(
                f"Permission denied reading cookie file: {self.cookie_path}"
            ) from e

        try:
            responses = await self._command(f"AUTHENTICATE {cookie_hex}")
            self._check_success(responses)
            self._authenticated = True
            logger.info("Authenticated with Tor using cookie")
        except TorControlError as e:
            raise TorAuthenticationError(f"Cookie authentication failed: {e}") from e

    async def _authenticate_password(self) -> None:
        """Authenticate using password."""
        if not self.password:
            raise TorAuthenticationError("Password not configured")

        # Quote the password properly
        escaped_password = self.password.replace("\\", "\\\\").replace('"', '\\"')

        try:
            responses = await self._command(f'AUTHENTICATE "{escaped_password}"')
            self._check_success(responses)
            self._authenticated = True
            logger.info("Authenticated with Tor using password")
        except TorControlError as e:
            raise TorAuthenticationError(f"Password authentication failed: {e}") from e

    async def get_info(self, key: str) -> str:
        """
        Get information from Tor.

        Args:
            key: Information key (e.g., "version", "config-file")

        Returns:
            The requested information value
        """
        if not self._authenticated:
            raise TorControlError("Not authenticated")

        responses = await self._command(f"GETINFO {key}")
        self._check_success(responses)

        # Parse key=value from first response
        for status, _, message in responses:
            if status == "250" and "=" in message:
                _, value = message.split("=", 1)
                return value

        raise TorControlError(f"Could not parse GETINFO response for {key}")

    async def create_ephemeral_hidden_service(
        self,
        ports: list[tuple[int, str]],
        key_type: str = "NEW",
        key_blob: str = "ED25519-V3",
        discard_pk: bool = False,
        detach: bool = False,
        await_publication: bool = False,
        max_streams: int | None = None,
    ) -> EphemeralHiddenService:
        """
        Create an ephemeral hidden service using ADD_ONION.

        Ephemeral services exist only while the control connection is open.
        When the connection closes, the hidden service is automatically removed.

        Args:
            ports: List of (virtual_port, target) tuples.
                   Target is "host:port" or just "port" for localhost.
            key_type: "NEW" for new key, "ED25519-V3" or "RSA1024" for existing key
            key_blob: For NEW: "ED25519-V3" (recommended) or "RSA1024"
                      For existing: base64-encoded private key
            discard_pk: If True, don't return the private key
            detach: If True, service persists after control connection closes
            await_publication: If True, wait for HS descriptor to be published
            max_streams: Maximum concurrent streams (None for unlimited)

        Returns:
            EphemeralHiddenService with the created service details

        Example:
            # Create service that forwards port 80 to local 8080
            hs = await client.create_ephemeral_hidden_service(
                ports=[(80, "127.0.0.1:8080")]
            )
        """
        if not self._authenticated:
            raise TorControlError("Not authenticated")

        # Build port specifications
        port_specs = []
        for virtual_port, target in ports:
            port_specs.append(f"Port={virtual_port},{target}")

        # Build flags
        flags = []
        if discard_pk:
            flags.append("DiscardPK")
        if detach:
            flags.append("Detach")
        if await_publication:
            flags.append("AwaitPublication")

        # Build command
        cmd_parts = [f"ADD_ONION {key_type}:{key_blob}"]
        cmd_parts.extend(port_specs)

        if flags:
            cmd_parts.append(f"Flags={','.join(flags)}")

        if max_streams is not None:
            cmd_parts.append(f"MaxStreams={max_streams}")

        command = " ".join(cmd_parts)

        try:
            responses = await self._command(command)
            self._check_success(responses)
        except TorControlError as e:
            raise TorHiddenServiceError(f"Failed to create hidden service: {e}") from e

        # Parse response to get service ID and optional private key
        service_id: str | None = None
        private_key: str | None = None

        for status, _, message in responses:
            if status == "250":
                if message.startswith("ServiceID="):
                    service_id = message.split("=", 1)[1]
                elif message.startswith("PrivateKey="):
                    private_key = message.split("=", 1)[1]

        if not service_id:
            raise TorHiddenServiceError("No ServiceID in ADD_ONION response")

        hs = EphemeralHiddenService(
            service_id=service_id,
            private_key=private_key,
            ports=list(ports),
        )

        if not detach:
            self._hidden_services.append(hs)

        logger.info(f"Created ephemeral hidden service: {hs.onion_address}")
        return hs

    async def delete_ephemeral_hidden_service(self, service_id: str) -> None:
        """
        Delete an ephemeral hidden service.

        Args:
            service_id: The service ID (without .onion suffix)
        """
        if not self._authenticated:
            raise TorControlError("Not authenticated")

        # Strip .onion if included
        if service_id.endswith(".onion"):
            service_id = service_id[:-6]

        try:
            responses = await self._command(f"DEL_ONION {service_id}")
            self._check_success(responses)
            logger.info(f"Deleted hidden service: {service_id}")
        except TorControlError as e:
            raise TorHiddenServiceError(f"Failed to delete hidden service: {e}") from e

        # Remove from tracking
        self._hidden_services = [hs for hs in self._hidden_services if hs.service_id != service_id]

    async def get_version(self) -> str:
        """Get Tor version string."""
        return await self.get_info("version")

    @property
    def is_connected(self) -> bool:
        """Check if connected to control port."""
        return self._connected

    @property
    def is_authenticated(self) -> bool:
        """Check if authenticated."""
        return self._authenticated

    @property
    def hidden_services(self) -> list[EphemeralHiddenService]:
        """Get list of active ephemeral hidden services created by this client."""
        return list(self._hidden_services)

Async client for Tor control protocol.

Supports cookie authentication and ephemeral hidden service creation. The client maintains a persistent connection to control port.

Example

async with TorControlClient() as client: hs = await client.create_ephemeral_hidden_service( ports=[(8765, "127.0.0.1:8765")] ) print(f"Hidden service: {hs.onion_address}") # Service exists while connection is open

Service removed when context exits

Initialize Tor control client.

Args

control_host
Tor control port host
control_port
Tor control port number
cookie_path
Path to cookie auth file (usually /var/lib/tor/control_auth_cookie)
password
Optional password for HASHEDPASSWORD auth (not recommended)

Instance variables

prop hidden_services : list[EphemeralHiddenService]
Expand source code
@property
def hidden_services(self) -> list[EphemeralHiddenService]:
    """Get list of active ephemeral hidden services created by this client."""
    return list(self._hidden_services)

Get list of active ephemeral hidden services created by this client.

prop is_authenticated : bool
Expand source code
@property
def is_authenticated(self) -> bool:
    """Check if authenticated."""
    return self._authenticated

Check if authenticated.

prop is_connected : bool
Expand source code
@property
def is_connected(self) -> bool:
    """Check if connected to control port."""
    return self._connected

Check if connected to control port.

Methods

async def authenticate(self) ‑> None
Expand source code
async def authenticate(self) -> None:
    """
    Authenticate with Tor control port.

    Tries cookie authentication first if cookie_path is set,
    then falls back to password if provided.
    """
    if self._authenticated:
        return

    if not self._connected:
        await self.connect()

    # Try cookie authentication
    if self.cookie_path:
        await self._authenticate_cookie()
        return

    # Try password authentication
    if self.password:
        await self._authenticate_password()
        return

    # Try null authentication (for permissive configs)
    try:
        responses = await self._command("AUTHENTICATE")
        self._check_success(responses)
        self._authenticated = True
        logger.info("Authenticated with Tor (null auth)")
    except TorControlError as e:
        raise TorAuthenticationError(
            "No authentication method configured. Provide cookie_path or password."
        ) from e

Authenticate with Tor control port.

Tries cookie authentication first if cookie_path is set, then falls back to password if provided.

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close connection to Tor control port."""
    if not self._connected:
        return

    self._connected = False
    self._authenticated = False
    self._hidden_services.clear()

    if self._writer:
        try:
            self._writer.close()
            await self._writer.wait_closed()
        except Exception:
            pass
        self._writer = None
    self._reader = None

    logger.debug("Closed Tor control connection")

Close connection to Tor control port.

async def connect(self) ‑> None
Expand source code
async def connect(self) -> None:
    """Connect to Tor control port."""
    if self._connected:
        return

    try:
        logger.debug(f"Connecting to Tor control port {self.control_host}:{self.control_port}")
        self._reader, self._writer = await asyncio.wait_for(
            asyncio.open_connection(self.control_host, self.control_port),
            timeout=10.0,
        )
        self._connected = True
        logger.info(f"Connected to Tor control port at {self.control_host}:{self.control_port}")
    except TimeoutError as e:
        raise TorControlError(
            f"Timeout connecting to Tor control port at {self.control_host}:{self.control_port}"
        ) from e
    except OSError as e:
        raise TorControlError(
            f"Failed to connect to Tor control port at "
            f"{self.control_host}:{self.control_port}: {e}"
        ) from e

Connect to Tor control port.

async def create_ephemeral_hidden_service(self,
ports: list[tuple[int, str]],
key_type: str = 'NEW',
key_blob: str = 'ED25519-V3',
discard_pk: bool = False,
detach: bool = False,
await_publication: bool = False,
max_streams: int | None = None) ‑> EphemeralHiddenService
Expand source code
async def create_ephemeral_hidden_service(
    self,
    ports: list[tuple[int, str]],
    key_type: str = "NEW",
    key_blob: str = "ED25519-V3",
    discard_pk: bool = False,
    detach: bool = False,
    await_publication: bool = False,
    max_streams: int | None = None,
) -> EphemeralHiddenService:
    """
    Create an ephemeral hidden service using ADD_ONION.

    Ephemeral services exist only while the control connection is open.
    When the connection closes, the hidden service is automatically removed.

    Args:
        ports: List of (virtual_port, target) tuples.
               Target is "host:port" or just "port" for localhost.
        key_type: "NEW" for new key, "ED25519-V3" or "RSA1024" for existing key
        key_blob: For NEW: "ED25519-V3" (recommended) or "RSA1024"
                  For existing: base64-encoded private key
        discard_pk: If True, don't return the private key
        detach: If True, service persists after control connection closes
        await_publication: If True, wait for HS descriptor to be published
        max_streams: Maximum concurrent streams (None for unlimited)

    Returns:
        EphemeralHiddenService with the created service details

    Example:
        # Create service that forwards port 80 to local 8080
        hs = await client.create_ephemeral_hidden_service(
            ports=[(80, "127.0.0.1:8080")]
        )
    """
    if not self._authenticated:
        raise TorControlError("Not authenticated")

    # Build port specifications
    port_specs = []
    for virtual_port, target in ports:
        port_specs.append(f"Port={virtual_port},{target}")

    # Build flags
    flags = []
    if discard_pk:
        flags.append("DiscardPK")
    if detach:
        flags.append("Detach")
    if await_publication:
        flags.append("AwaitPublication")

    # Build command
    cmd_parts = [f"ADD_ONION {key_type}:{key_blob}"]
    cmd_parts.extend(port_specs)

    if flags:
        cmd_parts.append(f"Flags={','.join(flags)}")

    if max_streams is not None:
        cmd_parts.append(f"MaxStreams={max_streams}")

    command = " ".join(cmd_parts)

    try:
        responses = await self._command(command)
        self._check_success(responses)
    except TorControlError as e:
        raise TorHiddenServiceError(f"Failed to create hidden service: {e}") from e

    # Parse response to get service ID and optional private key
    service_id: str | None = None
    private_key: str | None = None

    for status, _, message in responses:
        if status == "250":
            if message.startswith("ServiceID="):
                service_id = message.split("=", 1)[1]
            elif message.startswith("PrivateKey="):
                private_key = message.split("=", 1)[1]

    if not service_id:
        raise TorHiddenServiceError("No ServiceID in ADD_ONION response")

    hs = EphemeralHiddenService(
        service_id=service_id,
        private_key=private_key,
        ports=list(ports),
    )

    if not detach:
        self._hidden_services.append(hs)

    logger.info(f"Created ephemeral hidden service: {hs.onion_address}")
    return hs

Create an ephemeral hidden service using ADD_ONION.

Ephemeral services exist only while the control connection is open. When the connection closes, the hidden service is automatically removed.

Args

ports
List of (virtual_port, target) tuples. Target is "host:port" or just "port" for localhost.
key_type
"NEW" for new key, "ED25519-V3" or "RSA1024" for existing key
key_blob
For NEW: "ED25519-V3" (recommended) or "RSA1024" For existing: base64-encoded private key
discard_pk
If True, don't return the private key
detach
If True, service persists after control connection closes
await_publication
If True, wait for HS descriptor to be published
max_streams
Maximum concurrent streams (None for unlimited)

Returns

EphemeralHiddenService with the created service details

Example

Create service that forwards port 80 to local 8080

hs = await client.create_ephemeral_hidden_service( ports=[(80, "127.0.0.1:8080")] )

async def delete_ephemeral_hidden_service(self, service_id: str) ‑> None
Expand source code
async def delete_ephemeral_hidden_service(self, service_id: str) -> None:
    """
    Delete an ephemeral hidden service.

    Args:
        service_id: The service ID (without .onion suffix)
    """
    if not self._authenticated:
        raise TorControlError("Not authenticated")

    # Strip .onion if included
    if service_id.endswith(".onion"):
        service_id = service_id[:-6]

    try:
        responses = await self._command(f"DEL_ONION {service_id}")
        self._check_success(responses)
        logger.info(f"Deleted hidden service: {service_id}")
    except TorControlError as e:
        raise TorHiddenServiceError(f"Failed to delete hidden service: {e}") from e

    # Remove from tracking
    self._hidden_services = [hs for hs in self._hidden_services if hs.service_id != service_id]

Delete an ephemeral hidden service.

Args

service_id
The service ID (without .onion suffix)
async def get_info(self, key: str) ‑> str
Expand source code
async def get_info(self, key: str) -> str:
    """
    Get information from Tor.

    Args:
        key: Information key (e.g., "version", "config-file")

    Returns:
        The requested information value
    """
    if not self._authenticated:
        raise TorControlError("Not authenticated")

    responses = await self._command(f"GETINFO {key}")
    self._check_success(responses)

    # Parse key=value from first response
    for status, _, message in responses:
        if status == "250" and "=" in message:
            _, value = message.split("=", 1)
            return value

    raise TorControlError(f"Could not parse GETINFO response for {key}")

Get information from Tor.

Args

key
Information key (e.g., "version", "config-file")

Returns

The requested information value

async def get_version(self) ‑> str
Expand source code
async def get_version(self) -> str:
    """Get Tor version string."""
    return await self.get_info("version")

Get Tor version string.

class TorControlConfig (**data: Any)
Expand source code
class TorControlConfig(BaseModel):
    """
    Configuration for Tor control port connection.

    When enabled, allows dynamic creation of ephemeral hidden services
    at startup using Tor's control port. This allows generating a new
    .onion address each time without needing to pre-configure the hidden
    service in torrc.

    Requires Tor to be configured with:
        ControlPort 127.0.0.1:9051
        CookieAuthentication 1
        CookieAuthFile /var/lib/tor/control_auth_cookie

    Auto-detects configuration from environment variables:
        TOR_CONTROL_HOST - Tor control host (default: 127.0.0.1)
        TOR_CONTROL_PORT - Tor control port (default: 9051)
        TOR_COOKIE_PATH - Cookie auth file path
        TOR_PASSWORD - Tor control password (not recommended)
    """

    enabled: bool = Field(default=True, description="Enable Tor control port integration")
    host: str = Field(default="127.0.0.1", description="Tor control port host")
    port: int = Field(default=9051, ge=1, le=65535, description="Tor control port")
    cookie_path: Path | None = Field(
        default=None,
        description="Path to Tor cookie auth file (e.g., /var/lib/tor/control_auth_cookie)",
    )
    password: str | None = Field(
        default=None,
        description="Password for HASHEDPASSWORD auth (not recommended, use cookie auth)",
    )

    model_config = {"frozen": False}

Configuration for Tor control port connection.

When enabled, allows dynamic creation of ephemeral hidden services at startup using Tor's control port. This allows generating a new .onion address each time without needing to pre-configure the hidden service in torrc.

Requires Tor to be configured with: ControlPort 127.0.0.1:9051 CookieAuthentication 1 CookieAuthFile /var/lib/tor/control_auth_cookie

Auto-detects configuration from environment variables: TOR_CONTROL_HOST - Tor control host (default: 127.0.0.1) TOR_CONTROL_PORT - Tor control port (default: 9051) TOR_COOKIE_PATH - Cookie auth file path TOR_PASSWORD - Tor control password (not recommended)

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var cookie_path : pathlib.Path | None

The type of the None singleton.

var enabled : bool

The type of the None singleton.

var host : str

The type of the None singleton.

var model_config

The type of the None singleton.

var password : str | None

The type of the None singleton.

var port : int

The type of the None singleton.

class TorControlError (*args, **kwargs)
Expand source code
class TorControlError(Exception):
    """Base exception for Tor control errors."""

    pass

Base exception for Tor control errors.

Ancestors

  • builtins.Exception
  • builtins.BaseException

Subclasses

class TorHiddenServiceError (*args, **kwargs)
Expand source code
class TorHiddenServiceError(TorControlError):
    """Failed to create or manage hidden service."""

    pass

Failed to create or manage hidden service.

Ancestors

class TxInput (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class TxInput:
    """Transaction input."""

    txid: str  # In RPC format (big-endian hex)
    vout: int
    value: int = 0
    scriptpubkey: str = ""
    scriptsig: str = ""
    sequence: int = 0xFFFFFFFF

Transaction input.

Instance variables

var scriptpubkey : str

The type of the None singleton.

var scriptsig : str

The type of the None singleton.

var sequence : int

The type of the None singleton.

var txid : str

The type of the None singleton.

var value : int

The type of the None singleton.

var vout : int

The type of the None singleton.

class TxOutput (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class TxOutput:
    """Transaction output."""

    address: str
    value: int
    scriptpubkey: str = ""

Transaction output.

Instance variables

var address : str

The type of the None singleton.

var scriptpubkey : str

The type of the None singleton.

var value : int

The type of the None singleton.

class UTXOMetadata (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class UTXOMetadata:
    """
    Extended UTXO metadata for Neutrino-compatible verification.

    This allows light clients to verify UTXOs without arbitrary blockchain queries
    by providing the scriptPubKey (for Neutrino watch list) and block height
    (for efficient rescan starting point).
    """

    txid: str
    vout: int
    scriptpubkey: str | None = None  # Hex-encoded scriptPubKey
    blockheight: int | None = None  # Block height where UTXO was confirmed

    def to_legacy_str(self) -> str:
        """Format as legacy string: txid:vout"""
        return f"{self.txid}:{self.vout}"

    def to_extended_str(self) -> str:
        """Format as extended string: txid:vout:scriptpubkey:blockheight"""
        if self.scriptpubkey is None or self.blockheight is None:
            return self.to_legacy_str()
        return f"{self.txid}:{self.vout}:{self.scriptpubkey}:{self.blockheight}"

    @classmethod
    def from_str(cls, s: str) -> UTXOMetadata:
        """
        Parse UTXO string in either legacy or extended format.

        Legacy format: txid:vout
        Extended format: txid:vout:scriptpubkey:blockheight
        """
        parts = s.split(":")
        if len(parts) == 2:
            # Legacy format
            return cls(txid=parts[0], vout=int(parts[1]))
        elif len(parts) == 4:
            # Extended format
            return cls(
                txid=parts[0],
                vout=int(parts[1]),
                scriptpubkey=parts[2],
                blockheight=int(parts[3]),
            )
        else:
            raise ValueError(f"Invalid UTXO format: {s}")

    def has_neutrino_metadata(self) -> bool:
        """Check if this UTXO has the metadata needed for Neutrino verification."""
        return self.scriptpubkey is not None and self.blockheight is not None

    @staticmethod
    def is_valid_scriptpubkey(scriptpubkey: str) -> bool:
        """Validate scriptPubKey format (hex string)."""
        if not scriptpubkey:
            return False
        # Must be valid hex
        if not re.match(r"^[0-9a-fA-F]+$", scriptpubkey):
            return False
        # Common scriptPubKey lengths (in hex chars):
        # P2PKH: 50 (25 bytes), P2SH: 46 (23 bytes)
        # P2WPKH: 44 (22 bytes), P2WSH: 68 (34 bytes)
        # P2TR: 68 (34 bytes)
        return not (len(scriptpubkey) < 4 or len(scriptpubkey) > 200)

Extended UTXO metadata for Neutrino-compatible verification.

This allows light clients to verify UTXOs without arbitrary blockchain queries by providing the scriptPubKey (for Neutrino watch list) and block height (for efficient rescan starting point).

Static methods

def from_str(s: str) ‑> UTXOMetadata

Parse UTXO string in either legacy or extended format.

Legacy format: txid:vout Extended format: txid:vout:scriptpubkey:blockheight

def is_valid_scriptpubkey(scriptpubkey: str) ‑> bool
Expand source code
@staticmethod
def is_valid_scriptpubkey(scriptpubkey: str) -> bool:
    """Validate scriptPubKey format (hex string)."""
    if not scriptpubkey:
        return False
    # Must be valid hex
    if not re.match(r"^[0-9a-fA-F]+$", scriptpubkey):
        return False
    # Common scriptPubKey lengths (in hex chars):
    # P2PKH: 50 (25 bytes), P2SH: 46 (23 bytes)
    # P2WPKH: 44 (22 bytes), P2WSH: 68 (34 bytes)
    # P2TR: 68 (34 bytes)
    return not (len(scriptpubkey) < 4 or len(scriptpubkey) > 200)

Validate scriptPubKey format (hex string).

Instance variables

var blockheight : int | None

The type of the None singleton.

var scriptpubkey : str | None

The type of the None singleton.

var txid : str

The type of the None singleton.

var vout : int

The type of the None singleton.

Methods

def has_neutrino_metadata(self) ‑> bool
Expand source code
def has_neutrino_metadata(self) -> bool:
    """Check if this UTXO has the metadata needed for Neutrino verification."""
    return self.scriptpubkey is not None and self.blockheight is not None

Check if this UTXO has the metadata needed for Neutrino verification.

def to_extended_str(self) ‑> str
Expand source code
def to_extended_str(self) -> str:
    """Format as extended string: txid:vout:scriptpubkey:blockheight"""
    if self.scriptpubkey is None or self.blockheight is None:
        return self.to_legacy_str()
    return f"{self.txid}:{self.vout}:{self.scriptpubkey}:{self.blockheight}"

Format as extended string: txid:vout:scriptpubkey:blockheight

def to_legacy_str(self) ‑> str
Expand source code
def to_legacy_str(self) -> str:
    """Format as legacy string: txid:vout"""
    return f"{self.txid}:{self.vout}"

Format as legacy string: txid:vout

class WalletConfig (**data: Any)
Expand source code
class WalletConfig(BaseModel):
    """
    Base wallet configuration shared by all JoinMarket wallet users.

    Includes wallet seed, network settings, HD wallet structure, and
    backend connection details.
    """

    # Wallet seed
    mnemonic: str = Field(..., description="BIP39 mnemonic phrase for wallet seed")

    # Network settings
    network: NetworkType = Field(
        default=NetworkType.MAINNET,
        description="Protocol network for directory server handshakes",
    )
    bitcoin_network: NetworkType | None = Field(
        default=None,
        description="Bitcoin network for address generation (defaults to same as network)",
    )

    # Data directory
    data_dir: Path | None = Field(
        default=None,
        description=(
            "Data directory for JoinMarket files (commitment blacklist, history, etc.). "
            "Defaults to ~/.joinmarket-ng or $JOINMARKET_DATA_DIR if set"
        ),
    )

    # Backend configuration
    backend_type: str = Field(
        default="full_node",
        description="Backend type: 'full_node' or 'neutrino'",
    )
    backend_config: dict[str, Any] = Field(
        default_factory=dict,
        description="Backend-specific configuration",
    )

    # Directory servers
    directory_servers: list[str] = Field(
        default_factory=list,
        description="List of directory server URLs (e.g., ['onion_host:port', ...])",
    )

    # Tor/SOCKS configuration
    socks_host: str = Field(default="127.0.0.1", description="Tor SOCKS5 proxy host")
    socks_port: int = Field(default=9050, ge=1, le=65535, description="Tor SOCKS5 proxy port")

    # HD wallet structure
    mixdepth_count: int = Field(
        default=5,
        ge=1,
        le=10,
        description="Number of mixdepths in the wallet (privacy compartments)",
    )
    gap_limit: int = Field(default=20, ge=6, description="BIP44 gap limit for address scanning")

    # Dust threshold
    dust_threshold: int = Field(
        default=DUST_THRESHOLD,
        ge=0,
        description="Dust threshold in satoshis for change outputs (default: 27300)",
    )

    model_config = {"frozen": False}

    @model_validator(mode="after")
    def set_bitcoin_network_default(self) -> WalletConfig:
        """If bitcoin_network is not set, default to the protocol network."""
        if self.bitcoin_network is None:
            object.__setattr__(self, "bitcoin_network", self.network)
        return self

Base wallet configuration shared by all JoinMarket wallet users.

Includes wallet seed, network settings, HD wallet structure, and backend connection details.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Subclasses

Class variables

var backend_config : dict[str, typing.Any]

The type of the None singleton.

var backend_type : str

The type of the None singleton.

var bitcoin_networkNetworkType | None

The type of the None singleton.

var data_dir : pathlib.Path | None

The type of the None singleton.

var directory_servers : list[str]

The type of the None singleton.

var dust_threshold : int

The type of the None singleton.

var gap_limit : int

The type of the None singleton.

var mixdepth_count : int

The type of the None singleton.

var mnemonic : str

The type of the None singleton.

var model_config

The type of the None singleton.

var networkNetworkType

The type of the None singleton.

var socks_host : str

The type of the None singleton.

var socks_port : int

The type of the None singleton.

Methods

def set_bitcoin_network_default(self) ‑> WalletConfig
Expand source code
@model_validator(mode="after")
def set_bitcoin_network_default(self) -> WalletConfig:
    """If bitcoin_network is not set, default to the protocol network."""
    if self.bitcoin_network is None:
        object.__setattr__(self, "bitcoin_network", self.network)
    return self

If bitcoin_network is not set, default to the protocol network.