Package maker

JoinMarket Yield Generator (Maker Bot).

Sub-modules

maker.bot

Main maker bot implementation …

maker.cli

Maker bot CLI using Typer.

maker.coinjoin

CoinJoin protocol handler for makers …

maker.config

Maker bot configuration.

maker.fidelity

Fidelity bond utilities for maker bot.

maker.offers

Offer management for makers …

maker.podle

Proof of Discrete Log Equivalence (PoDLE) verification for makers …

maker.tx_verification

Transaction verification for makers …

Classes

class MakerBot (wallet: WalletService,
backend: BlockchainBackend,
config: MakerConfig)
Expand source code
class MakerBot:
    """
    Main maker bot coordinating all components.
    """

    def __init__(
        self,
        wallet: WalletService,
        backend: BlockchainBackend,
        config: MakerConfig,
    ):
        self.wallet = wallet
        self.backend = backend
        self.config = config

        # Create nick identity for signing messages
        self.nick_identity = NickIdentity(JM_VERSION)
        self.nick = self.nick_identity.nick

        self.offer_manager = OfferManager(self.wallet, config, self.nick)

        self.directory_clients: dict[str, DirectoryClient] = {}
        self.active_sessions: dict[str, CoinJoinSession] = {}
        self.current_offers: list[Offer] = []
        self.fidelity_bond: FidelityBondInfo | None = None
        self.current_block_height: int = 0  # Cached block height for bond proof generation

        self.running = False
        self.listen_tasks: list[asyncio.Task[None]] = []

        # Hidden service listener for direct peer connections
        self.hidden_service_listener: HiddenServiceListener | None = None
        self.direct_connections: dict[str, TCPConnection] = {}

        # Tor control for dynamic hidden service creation
        self._tor_control: TorControlClient | None = None
        self._ephemeral_hidden_service: EphemeralHiddenService | None = None

        # Generic per-peer rate limiter (token bucket algorithm)
        # Generous burst (100 msgs) but low sustained rate (10 msg/s)
        self._message_rate_limiter = RateLimiter(
            rate_limit=config.message_rate_limit,
            burst_limit=config.message_burst_limit,
        )

        # Rate limiter for orderbook requests to prevent spam attacks
        self._orderbook_rate_limiter = OrderbookRateLimiter(
            rate_limit=config.orderbook_rate_limit,
            interval=config.orderbook_rate_interval,
            violation_ban_threshold=config.orderbook_violation_ban_threshold,
            violation_warning_threshold=config.orderbook_violation_warning_threshold,
            violation_severe_threshold=config.orderbook_violation_severe_threshold,
            ban_duration=config.orderbook_ban_duration,
        )

    async def _setup_tor_hidden_service(self) -> str | None:
        """
        Create an ephemeral hidden service via Tor control port.

        Returns:
            The .onion address if successful, None otherwise
        """
        if not self.config.tor_control.enabled:
            logger.debug("Tor control port integration disabled")
            return None

        try:
            logger.info(
                f"Connecting to Tor control port at "
                f"{self.config.tor_control.host}:{self.config.tor_control.port}..."
            )

            self._tor_control = TorControlClient(
                control_host=self.config.tor_control.host,
                control_port=self.config.tor_control.port,
                cookie_path=self.config.tor_control.cookie_path,
                password=self.config.tor_control.password,
            )

            await self._tor_control.connect()
            await self._tor_control.authenticate()

            # Get Tor version for logging
            try:
                tor_version = await self._tor_control.get_version()
                logger.info(f"Connected to Tor {tor_version}")
            except TorControlError:
                logger.debug("Could not get Tor version (non-critical)")

            # Create ephemeral hidden service
            # Maps external port (advertised) to our local serving port
            logger.info(
                f"Creating ephemeral hidden service on port {self.config.onion_serving_port} -> "
                f"{self.config.onion_serving_host}:{self.config.onion_serving_port}..."
            )

            self._ephemeral_hidden_service = (
                await self._tor_control.create_ephemeral_hidden_service(
                    ports=[
                        (
                            self.config.onion_serving_port,
                            f"{self.config.onion_serving_host}:{self.config.onion_serving_port}",
                        )
                    ],
                    # Don't discard private key in case we want to log it for debugging
                    discard_pk=True,
                    # Don't detach - we want the service to be removed when we disconnect
                    detach=False,
                )
            )

            logger.info(
                f"✓ Created ephemeral hidden service: "
                f"{self._ephemeral_hidden_service.onion_address}"
            )
            return self._ephemeral_hidden_service.onion_address

        except TorControlError as e:
            logger.warning(
                f"Could not create ephemeral hidden service via Tor control port: {e}\n"
                f"  Tor control configured: "
                f"{self.config.tor_control.host}:{self.config.tor_control.port}\n"
                f"  Cookie path: {self.config.tor_control.cookie_path}\n"
                f"  → Maker will advertise 'NOT-SERVING-ONION' and rely on directory routing.\n"
                f"  → For better privacy, ensure Tor is running with control port enabled:\n"
                f"     ControlPort {self.config.tor_control.port}\n"
                f"     CookieAuthentication 1"
            )
            # Clean up partial connection
            if self._tor_control:
                await self._tor_control.close()
                self._tor_control = None
            return None

    async def _cleanup_tor_hidden_service(self) -> None:
        """Clean up Tor control connection (hidden service is auto-removed)."""
        if self._tor_control:
            try:
                await self._tor_control.close()
                logger.debug("Closed Tor control connection")
            except Exception as e:
                logger.warning(f"Error closing Tor control connection: {e}")
            self._tor_control = None
            self._ephemeral_hidden_service = None

    async def start(self) -> None:
        """
        Start the maker bot.

        Flow:
        1. Initialize commitment blacklist
        2. Sync wallet with blockchain
        3. Create ephemeral hidden service if tor_control enabled
        4. Connect to directory servers
        5. Create and announce offers
        6. Listen for taker requests
        """
        try:
            logger.info(f"Starting maker bot (nick: {self.nick})")

            # Initialize commitment blacklist with configured data directory
            set_blacklist_path(data_dir=self.config.data_dir)

            # Load fidelity bond addresses for optimized scanning
            # We scan wallet + fidelity bonds in a single pass to avoid two separate
            # scantxoutset calls (which take ~90s each on mainnet)
            from jmcore.paths import get_default_data_dir
            from jmwallet.wallet.bond_registry import load_registry

            resolved_data_dir = (
                self.config.data_dir if self.config.data_dir else get_default_data_dir()
            )
            fidelity_bond_addresses: list[tuple[str, int, int]] = []

            # Option 1: Manual specification via fidelity_bond_index + locktimes (bypasses registry)
            # This is useful when running in Docker or when you don't have a registry yet
            if self.config.fidelity_bond_index is not None and self.config.fidelity_bond_locktimes:
                logger.info(
                    f"Using manual fidelity bond specification: "
                    f"index={self.config.fidelity_bond_index}, "
                    f"locktimes={self.config.fidelity_bond_locktimes}"
                )
                for locktime in self.config.fidelity_bond_locktimes:
                    address = self.wallet.get_fidelity_bond_address(
                        self.config.fidelity_bond_index, locktime
                    )
                    fidelity_bond_addresses.append(
                        (address, locktime, self.config.fidelity_bond_index)
                    )
                    logger.info(
                        f"Generated fidelity bond address for locktime {locktime}: {address}"
                    )
            # Option 2: Load from registry (default)
            else:
                bond_registry = load_registry(resolved_data_dir)
                if bond_registry.bonds:
                    # Extract (address, locktime, index) tuples from registry
                    fidelity_bond_addresses = [
                        (bond.address, bond.locktime, bond.index) for bond in bond_registry.bonds
                    ]
                    logger.info(
                        f"Loaded {len(fidelity_bond_addresses)} "
                        f"fidelity bond address(es) from registry"
                    )

            logger.info("Syncing wallet and fidelity bonds...")
            await self.wallet.sync_all(fidelity_bond_addresses)

            # Update bond registry with UTXO info from the scan (only if using registry)
            if self.config.fidelity_bond_index is None and fidelity_bond_addresses:
                from jmwallet.wallet.bond_registry import save_registry

                bond_registry = load_registry(resolved_data_dir)
                for bond in bond_registry.bonds:
                    # Find the UTXO for this bond address in mixdepth 0
                    bond_utxo = next(
                        (
                            utxo
                            for utxo in self.wallet.utxo_cache.get(0, [])
                            if utxo.address == bond.address
                        ),
                        None,
                    )
                    if bond_utxo:
                        # Update the bond registry with UTXO info
                        bond.txid = bond_utxo.txid
                        bond.vout = bond_utxo.vout
                        bond.value = bond_utxo.value
                        bond.confirmations = bond_utxo.confirmations
                        logger.debug(
                            f"Updated bond {bond.address[:20]}... with UTXO "
                            f"{bond_utxo.txid[:16]}...:{bond_utxo.vout}, value={bond_utxo.value}"
                        )

                # Save updated registry
                save_registry(bond_registry, resolved_data_dir)

            # Get current block height for bond proof generation
            self.current_block_height = await self.backend.get_block_height()
            logger.debug(f"Current block height: {self.current_block_height}")

            total_balance = await self.wallet.get_total_balance()
            logger.info(f"Wallet synced. Total balance: {total_balance:,} sats")

            # Find fidelity bond for proof generation
            # If a specific bond is selected in config, use it; otherwise use the best one
            if self.config.selected_fidelity_bond:
                # User specified a specific bond
                sel_txid, sel_vout = self.config.selected_fidelity_bond
                bonds = find_fidelity_bonds(self.wallet)
                self.fidelity_bond = next(
                    (b for b in bonds if b.txid == sel_txid and b.vout == sel_vout), None
                )
                if self.fidelity_bond:
                    logger.info(
                        f"Using selected fidelity bond: {sel_txid[:16]}...:{sel_vout}, "
                        f"value={self.fidelity_bond.value:,} sats, "
                        f"bond_value={self.fidelity_bond.bond_value:,}"
                    )
                else:
                    logger.warning(
                        f"Selected fidelity bond {sel_txid[:16]}...:{sel_vout} not found, "
                        "falling back to best available"
                    )
                    self.fidelity_bond = get_best_fidelity_bond(self.wallet)
            else:
                # Auto-select the best (largest bond value) fidelity bond
                self.fidelity_bond = get_best_fidelity_bond(self.wallet)
            if self.fidelity_bond:
                logger.info(
                    f"Fidelity bond found: {self.fidelity_bond.txid[:16]}..., "
                    f"value={self.fidelity_bond.value:,} sats, "
                    f"bond_value={self.fidelity_bond.bond_value:,}"
                )
            else:
                logger.info("No fidelity bond found (offers will have no bond proof)")

            logger.info("Creating offers...")
            self.current_offers = await self.offer_manager.create_offers()

            # If no offers due to insufficient balance, wait and retry
            retry_count = 0
            max_retries = 30  # 5 minutes max wait (30 * 10s)
            while not self.current_offers and retry_count < max_retries:
                retry_count += 1
                logger.warning(
                    f"No offers created (insufficient balance?). "
                    f"Waiting 10s and retrying... (attempt {retry_count}/{max_retries})"
                )
                await asyncio.sleep(10)

                # Re-sync wallet to check for new funds
                await self.wallet.sync_all()
                total_balance = await self.wallet.get_total_balance()
                logger.info(f"Wallet re-synced. Total balance: {total_balance:,} sats")

                self.current_offers = await self.offer_manager.create_offers()

            if not self.current_offers:
                logger.error(
                    f"No offers created after {max_retries} retries. "
                    "Please fund the wallet and restart."
                )
                return

            # Set up ephemeral hidden service via Tor control port if enabled
            # This must happen before connecting to directory servers so we can
            # advertise the onion address
            ephemeral_onion = await self._setup_tor_hidden_service()
            if ephemeral_onion:
                # Override onion_host with the dynamically created one
                object.__setattr__(self.config, "onion_host", ephemeral_onion)
                logger.info(f"Using ephemeral onion address: {ephemeral_onion}")

            # Determine the onion address to advertise
            onion_host = self.config.onion_host

            logger.info("Connecting to directory servers...")
            for dir_server in self.config.directory_servers:
                try:
                    parts = dir_server.split(":")
                    host = parts[0]
                    port = int(parts[1]) if len(parts) > 1 else 5222

                    # Determine location for handshake:
                    # If we have an onion_host configured (static or ephemeral),
                    # advertise it with port
                    # Otherwise, use NOT-SERVING-ONION
                    if onion_host:
                        location = f"{onion_host}:{self.config.onion_serving_port}"
                    else:
                        location = "NOT-SERVING-ONION"

                    # Advertise neutrino_compat if our backend can provide extended UTXO metadata.
                    # This tells Neutrino takers that we can provide scriptpubkey and blockheight.
                    # Full nodes (Bitcoin Core) can provide this; light clients (Neutrino) cannot.
                    neutrino_compat = self.backend.can_provide_neutrino_metadata()

                    # Create DirectoryClient with SOCKS config for Tor connections
                    client = DirectoryClient(
                        host=host,
                        port=port,
                        network=self.config.network.value,
                        nick_identity=self.nick_identity,
                        location=location,
                        socks_host=self.config.socks_host,
                        socks_port=self.config.socks_port,
                        neutrino_compat=neutrino_compat,
                    )

                    await client.connect()
                    node_id = f"{host}:{port}"
                    self.directory_clients[node_id] = client

                    logger.info(f"Connected to directory: {dir_server}")

                except Exception as e:
                    logger.error(f"Failed to connect to {dir_server}: {e}")

            if not self.directory_clients:
                logger.error("Failed to connect to any directory server")
                return

            # Start hidden service listener if we have an onion address (static or ephemeral)
            if onion_host:
                logger.info(
                    f"Starting hidden service listener on "
                    f"{self.config.onion_serving_host}:{self.config.onion_serving_port}..."
                )
                self.hidden_service_listener = HiddenServiceListener(
                    host=self.config.onion_serving_host,
                    port=self.config.onion_serving_port,
                    on_connection=self._on_direct_connection,
                )
                await self.hidden_service_listener.start()
                logger.info(f"Hidden service listener started (onion: {onion_host})")

            logger.info("Announcing offers...")
            await self._announce_offers()

            logger.info("Maker bot started. Listening for takers...")
            self.running = True

            # Start listening on all directory clients
            for node_id, client in self.directory_clients.items():
                task = asyncio.create_task(self._listen_client(node_id, client))
                self.listen_tasks.append(task)

            # If hidden service listener is running, start serve_forever task
            if self.hidden_service_listener:
                task = asyncio.create_task(self.hidden_service_listener.serve_forever())
                self.listen_tasks.append(task)

            # Start background task to monitor pending transactions
            monitor_task = asyncio.create_task(self._monitor_pending_transactions())
            self.listen_tasks.append(monitor_task)

            # Start periodic wallet rescan task
            rescan_task = asyncio.create_task(self._periodic_rescan())
            self.listen_tasks.append(rescan_task)

            # Start periodic rate limit status logging task
            status_task = asyncio.create_task(self._periodic_rate_limit_status())
            self.listen_tasks.append(status_task)

            # Start periodic directory connection status logging task
            conn_status_task = asyncio.create_task(self._periodic_directory_connection_status())
            self.listen_tasks.append(conn_status_task)

            # Wait for all listening tasks to complete
            await asyncio.gather(*self.listen_tasks, return_exceptions=True)

        except Exception as e:
            logger.error(f"Failed to start maker bot: {e}")
            raise

    async def stop(self) -> None:
        """Stop the maker bot"""
        logger.info("Stopping maker bot...")
        self.running = False

        # Cancel all listening tasks
        for task in self.listen_tasks:
            task.cancel()

        if self.listen_tasks:
            await asyncio.gather(*self.listen_tasks, return_exceptions=True)

        # Stop hidden service listener
        if self.hidden_service_listener:
            await self.hidden_service_listener.stop()

        # Clean up Tor control connection (ephemeral hidden service auto-removed)
        await self._cleanup_tor_hidden_service()

        # Close all direct connections
        for conn in self.direct_connections.values():
            try:
                await conn.close()
            except Exception:
                pass
        self.direct_connections.clear()

        # Close all directory clients
        for client in self.directory_clients.values():
            try:
                await client.close()
            except Exception:
                pass

        await self.wallet.close()
        logger.info("Maker bot stopped")

    def _cleanup_timed_out_sessions(self) -> None:
        """Remove timed-out sessions from active_sessions and clean up rate limiter."""
        timed_out = [
            nick for nick, session in self.active_sessions.items() if session.is_timed_out()
        ]

        for nick in timed_out:
            session = self.active_sessions[nick]
            age = int(asyncio.get_event_loop().time() - session.created_at)
            logger.warning(
                f"Cleaning up timed-out session with {nick} (state: {session.state}, age: {age}s)"
            )
            del self.active_sessions[nick]

        # Periodically cleanup old rate limiter entries to prevent memory growth
        self._orderbook_rate_limiter.cleanup_old_entries()

    async def _resync_wallet_and_update_offers(self) -> None:
        """Re-sync wallet and update offers if balance changed.

        This is the core rescan logic used by both post-CoinJoin resync
        and periodic rescan. It:
        1. Saves the current max balance
        2. Re-syncs the wallet
        3. If max balance changed, recreates and re-announces offers
        """
        # Get current max balance before resync
        old_max_balance = 0
        for mixdepth in range(self.wallet.mixdepth_count):
            balance = await self.wallet.get_balance(mixdepth)
            old_max_balance = max(old_max_balance, balance)

        await self.wallet.sync_all()

        # Update current block height
        self.current_block_height = await self.backend.get_block_height()
        logger.debug(f"Updated block height: {self.current_block_height}")

        # Update pending history immediately after sync (in case of restart)
        await self._update_pending_history()

        # Get new max balance after resync
        new_max_balance = 0
        for mixdepth in range(self.wallet.mixdepth_count):
            balance = await self.wallet.get_balance(mixdepth)
            new_max_balance = max(new_max_balance, balance)

        total_balance = await self.wallet.get_total_balance()
        logger.info(f"Wallet re-synced. Total balance: {total_balance:,} sats")

        # If max balance changed, update offers
        if old_max_balance != new_max_balance:
            logger.info(
                f"Max balance changed: {old_max_balance:,} -> {new_max_balance:,} sats. "
                "Updating offers..."
            )
            await self._update_offers()
        else:
            logger.debug(f"Max balance unchanged at {new_max_balance:,} sats")

    async def _update_offers(self) -> None:
        """Recreate and re-announce offers based on current wallet state.

        Called when wallet balance changes (after CoinJoin, external transaction,
        or deposit). This allows the maker to adapt to changing balances without
        requiring a restart.
        """
        try:
            new_offers = await self.offer_manager.create_offers()

            if not new_offers:
                logger.warning(
                    "No offers could be created (insufficient balance?). "
                    "Keeping existing offers active."
                )
                return

            # Check if offers actually changed
            if self.current_offers and new_offers:
                old_maxsize = self.current_offers[0].maxsize
                new_maxsize = new_offers[0].maxsize
                if old_maxsize == new_maxsize:
                    logger.debug("Offer maxsize unchanged, skipping re-announcement")
                    return

            self.current_offers = new_offers
            await self._announce_offers()
            logger.info(f"Updated and re-announced offers: maxsize={new_offers[0].maxsize:,} sats")
        except Exception as e:
            logger.error(f"Failed to update offers: {e}")

    async def _periodic_rescan(self) -> None:
        """Background task to periodically rescan wallet and update offers.

        This runs every `rescan_interval_sec` (default: 10 minutes) to:
        1. Detect external transactions (deposits, Sparrow spends, etc.)
        2. Update pending transaction confirmations
        3. Update offers if balance changed

        This allows the maker to run in the background and adapt to balance
        changes without manual intervention.
        """
        logger.info(
            f"Starting periodic rescan task (interval: {self.config.rescan_interval_sec}s)..."
        )

        while self.running:
            try:
                await asyncio.sleep(self.config.rescan_interval_sec)

                if not self.running:
                    break

                logger.info("Periodic wallet rescan starting...")
                await self._resync_wallet_and_update_offers()

            except asyncio.CancelledError:
                logger.info("Periodic rescan task cancelled")
                break
            except Exception as e:
                logger.error(f"Error in periodic rescan: {e}")

        logger.info("Periodic rescan task stopped")

    async def _periodic_rate_limit_status(self) -> None:
        """Background task to periodically log rate limiting statistics.

        This runs every hour to provide visibility into spam/abuse without
        flooding logs. Shows:
        - Total violations across all peers
        - Currently banned peers
        - Top violators (by violation count)
        """
        # First log after 10 minutes (give time for initial activity)
        await asyncio.sleep(600)

        while self.running:
            try:
                stats = self._orderbook_rate_limiter.get_statistics()

                # Only log if there's activity worth reporting
                if stats["total_violations"] > 0 or stats["banned_peers"]:
                    banned_count = len(stats["banned_peers"])
                    banned_list = ", ".join(stats["banned_peers"][:5])
                    if banned_count > 5:
                        banned_list += f", ... and {banned_count - 5} more"

                    top_violators_str = ", ".join(
                        f"{nick}({count})" for nick, count in stats["top_violators"][:5]
                    )

                    logger.info(
                        f"Rate limit status: {stats['total_violations']} total violations, "
                        f"{banned_count} banned peer(s)"
                        + (f" [{banned_list}]" if banned_count > 0 else "")
                        + (
                            f", top violators: {top_violators_str}"
                            if stats["top_violators"]
                            else ""
                        )
                    )

                # Log again in 1 hour
                await asyncio.sleep(3600)

            except asyncio.CancelledError:
                logger.info("Rate limit status task cancelled")
                break
            except Exception as e:
                logger.error(f"Error in rate limit status task: {e}")
                await asyncio.sleep(3600)

        logger.info("Rate limit status task stopped")

    async def _periodic_directory_connection_status(self) -> None:
        """Background task to periodically log directory connection status.

        This runs every 10 minutes to provide visibility into orderbook
        connectivity. Shows:
        - Total directory servers configured
        - Currently connected servers
        - Disconnected servers (if any)
        """
        # First log after 5 minutes (give time for initial connection)
        await asyncio.sleep(300)

        while self.running:
            try:
                total_servers = len(self.config.directory_servers)
                connected_servers = list(self.directory_clients.keys())
                connected_count = len(connected_servers)
                disconnected_servers = [
                    server
                    for server in self.config.directory_servers
                    if f"{server.split(':')[0]}:{server.split(':')[1] if ':' in server else '5222'}"
                    not in connected_servers
                ]

                if disconnected_servers:
                    disconnected_str = ", ".join(disconnected_servers[:5])
                    if len(disconnected_servers) > 5:
                        disconnected_str += f", ... and {len(disconnected_servers) - 5} more"
                    logger.warning(
                        f"Directory connection status: {connected_count}/{total_servers} "
                        f"connected. Disconnected: [{disconnected_str}]"
                    )
                else:
                    logger.info(
                        f"Directory connection status: {connected_count}/{total_servers} connected "
                        f"[{', '.join(connected_servers)}]"
                    )

                # Log again in 10 minutes
                await asyncio.sleep(600)

            except asyncio.CancelledError:
                logger.info("Directory connection status task cancelled")
                break
            except Exception as e:
                logger.error(f"Error in directory connection status task: {e}")
                await asyncio.sleep(600)

        logger.info("Directory connection status task stopped")

    async def _monitor_pending_transactions(self) -> None:
        """
        Background task to monitor pending transactions and update their status.

        Checks pending transactions every 60 seconds and updates their confirmation
        status in the history file. Transactions are marked as successful once they
        receive their first confirmation.
        """
        logger.info("Starting pending transaction monitor...")
        check_interval = 60.0  # Check every 60 seconds

        while self.running:
            try:
                await asyncio.sleep(check_interval)
                await self._update_pending_history()

            except asyncio.CancelledError:
                logger.info("Pending transaction monitor cancelled")
                break
            except Exception as e:
                logger.error(f"Error in pending transaction monitor: {e}")

        logger.info("Pending transaction monitor stopped")

    async def _update_pending_history(self) -> None:
        """Check and update pending transaction confirmations in history.

        For entries without txid, attempts to discover the txid by checking
        if the destination address has received funds.
        """
        from jmwallet.history import (
            get_pending_transactions,
            update_pending_transaction_txid,
            update_transaction_confirmation,
        )

        pending = get_pending_transactions(data_dir=self.config.data_dir)
        if not pending:
            return

        logger.debug(f"Checking {len(pending)} pending transaction(s)...")

        for entry in pending:
            try:
                # If entry has no txid, try to discover it from the blockchain
                if not entry.txid:
                    if entry.destination_address:
                        logger.debug(
                            f"Attempting to discover txid for pending entry "
                            f"(dest: {entry.destination_address[:20]}...)"
                        )
                        # Look for the txid that paid to our CoinJoin address
                        txid = await self._discover_txid_for_address(entry.destination_address)
                        if txid:
                            update_pending_transaction_txid(
                                destination_address=entry.destination_address,
                                txid=txid,
                                data_dir=self.config.data_dir,
                            )
                            logger.info(
                                f"Discovered txid {txid[:16]}... for address "
                                f"{entry.destination_address[:20]}..."
                            )
                            # Update entry for confirmation check below
                            entry.txid = txid
                        else:
                            logger.debug(
                                f"No UTXO found for {entry.destination_address[:20]}... "
                                f"(tx may not be confirmed yet)"
                            )
                            continue
                    else:
                        logger.debug("Pending entry has no txid and no destination address")
                        continue

                # Check if transaction exists and get confirmations
                tx_info = await self.backend.get_transaction(entry.txid)

                if tx_info is None:
                    # Transaction not found - might have been rejected/replaced
                    from datetime import datetime

                    timestamp = datetime.fromisoformat(entry.timestamp)
                    age_hours = (datetime.now() - timestamp).total_seconds() / 3600

                    if age_hours > 24:
                        logger.warning(
                            f"Transaction {entry.txid[:16]}... not found after "
                            f"{age_hours:.1f} hours, may have been rejected"
                        )
                    continue

                confirmations = tx_info.confirmations

                # Mark as successful once it gets first confirmation
                if confirmations > 0 and entry.confirmations == 0:
                    logger.info(
                        f"Transaction {entry.txid[:16]}... confirmed "
                        f"({confirmations} confirmation(s))"
                    )
                    update_transaction_confirmation(
                        txid=entry.txid,
                        confirmations=confirmations,
                        data_dir=self.config.data_dir,
                    )

            except Exception as e:
                txid_str = entry.txid[:16] if entry.txid else "unknown"
                logger.debug(f"Error checking transaction {txid_str}...: {e}")

    async def _discover_txid_for_address(self, address: str) -> str | None:
        """Try to discover the txid for a transaction that paid to an address.

        This is used when a maker history entry doesn't have a txid recorded
        (e.g., from older versions or if the txid wasn't captured).

        Args:
            address: The destination address to check

        Returns:
            Transaction ID if found, None otherwise
        """
        try:
            # Get UTXOs for this address - if there are any, the first one's txid
            # is likely our CoinJoin (assuming fresh addresses are used)
            utxos = await self.backend.get_utxos([address])
            if utxos:
                # Return the txid of the first (and likely only) UTXO
                return utxos[0].txid
            return None
        except Exception as e:
            logger.debug(f"Error discovering txid for {address[:20]}...: {e}")
            return None

    async def _deferred_wallet_resync(self) -> None:
        """Resync wallet in background after a CoinJoin completes."""
        try:
            # Small delay to allow transaction to propagate
            await asyncio.sleep(2)
            logger.info("Performing deferred wallet resync after CoinJoin...")
            await self._resync_wallet_and_update_offers()
        except Exception as e:
            logger.error(f"Error in deferred wallet resync: {e}")

    async def _announce_offers(self) -> None:
        """Announce offers to all connected directory servers (public broadcast, NO bonds)"""
        for offer in self.current_offers:
            offer_msg = self._format_offer_announcement(offer, include_bond=False)

            for client in self.directory_clients.values():
                try:
                    await client.send_public_message(offer_msg)
                    logger.debug("Announced offer to directory")
                except Exception as e:
                    logger.error(f"Failed to announce offer: {e}")

    def _format_offer_announcement(self, offer: Offer, include_bond: bool = False) -> str:
        """Format offer for announcement.

        Format: <ordertype> <oid> <minsize> <maxsize> <txfee> <cjfee>[!tbond <proof>]

        Args:
            offer: The offer to format
            include_bond: If True, append fidelity bond proof (for PRIVMSG only)

        Note:
            According to the JoinMarket protocol:
            - Public broadcasts: NO fidelity bond proof
            - Private responses to !orderbook: Include !tbond <proof>
        """

        order_type_str = offer.ordertype.value

        # NOTE: Don't include nick!PUBLIC! prefix here - send_public_message() adds it
        msg = (
            f"{order_type_str} "
            f"{offer.oid} {offer.minsize} {offer.maxsize} "
            f"{offer.txfee} {offer.cjfee}"
        )

        # Append fidelity bond proof ONLY for private responses
        if include_bond and self.fidelity_bond is not None:
            # For private response, we use the requesting taker's nick
            # The ownership signature proves we control the UTXO
            bond_proof = create_fidelity_bond_proof(
                bond=self.fidelity_bond,
                maker_nick=self.nick,
                taker_nick=self.nick,  # Will be updated when sending to specific taker
                current_block_height=self.current_block_height,
            )
            if bond_proof:
                msg += f"!tbond {bond_proof}"
                logger.debug(
                    f"Added fidelity bond proof to offer (proof length: {len(bond_proof)})"
                )

        return msg

    async def _listen_client(self, node_id: str, client: DirectoryClient) -> None:
        """Listen for messages from a specific directory client"""
        logger.info(f"Started listening on {node_id}")

        # Track last cleanup time
        last_cleanup = asyncio.get_event_loop().time()
        cleanup_interval = 60.0  # Clean up timed-out sessions every 60 seconds

        while self.running:
            try:
                # Use listen_for_messages with short duration to check running flag frequently
                messages = await client.listen_for_messages(duration=1.0)

                for message in messages:
                    await self._handle_message(message)

                # Periodic cleanup of timed-out sessions
                now = asyncio.get_event_loop().time()
                if now - last_cleanup > cleanup_interval:
                    self._cleanup_timed_out_sessions()
                    last_cleanup = now

            except asyncio.CancelledError:
                logger.info(f"Listener for {node_id} cancelled")
                break
            except DirectoryClientError as e:
                # Connection lost - exit listener, let reconnection logic handle it
                logger.warning(f"Connection lost on {node_id}: {e}")
                break
            except Exception as e:
                logger.error(f"Error listening on {node_id}: {e}")
                await asyncio.sleep(1.0)

        logger.info(f"Stopped listening on {node_id}")

    async def _handle_message(self, message: dict[str, Any]) -> None:
        """Handle incoming message from directory"""
        try:
            from jmcore.protocol import MessageType

            msg_type = message.get("type")
            line = message.get("line", "")

            # Extract from_nick for rate limiting (format: from_nick!to_nick!msg)
            parts = line.split(COMMAND_PREFIX)
            if len(parts) >= 1:
                from_nick = parts[0]

                # Apply generic per-peer rate limiting
                if not self._message_rate_limiter.check(from_nick):
                    violations = self._message_rate_limiter.get_violation_count(from_nick)
                    # Only log every 50th violation to prevent log flooding
                    if violations % 50 == 0:
                        logger.warning(
                            f"Rate limit exceeded for {from_nick} ({violations} violations total)"
                        )
                    return  # Drop the message

            if msg_type == MessageType.PRIVMSG.value:
                await self._handle_privmsg(line)
            elif msg_type == MessageType.PUBMSG.value:
                await self._handle_pubmsg(line)
            elif msg_type == MessageType.PEERLIST.value:
                logger.debug(f"Received peerlist: {line[:50]}...")
            else:
                logger.debug(f"Ignoring message type {msg_type}")

        except Exception as e:
            logger.error(f"Failed to handle message: {e}")

    async def _handle_pubmsg(self, line: str) -> None:
        """Handle public message (e.g., !orderbook request)"""
        try:
            parts = line.split(COMMAND_PREFIX)
            if len(parts) < 3:
                return

            from_nick = parts[0]
            to_nick = parts[1]
            rest = COMMAND_PREFIX.join(parts[2:])

            # Ignore our own messages
            if from_nick == self.nick:
                return

            # Strip leading "!" and get command
            command = rest.strip().lstrip("!")

            # Respond to orderbook requests with PRIVMSG (including bond if available)
            if to_nick == "PUBLIC" and command == "orderbook":
                # Apply rate limiting to prevent spam attacks
                if not self._orderbook_rate_limiter.check(from_nick):
                    violations = self._orderbook_rate_limiter.get_violation_count(from_nick)
                    is_banned = self._orderbook_rate_limiter.is_banned(from_nick)

                    # Only log rate limiting (not bans) at specific violation milestones
                    # to prevent log flooding:
                    # - First violation (violations == 1)
                    # - Every 10th violation when not banned (10, 20, 30, etc.)
                    # Note: Ban events are already logged by check() method, so we skip
                    # logging here to avoid duplicate log messages
                    if not is_banned:
                        should_log = violations <= 1 or violations % 10 == 0

                        if should_log:
                            # Show backoff level for context
                            if violations >= self.config.orderbook_violation_severe_threshold:
                                backoff_level = "SEVERE"
                            elif violations >= self.config.orderbook_violation_warning_threshold:
                                backoff_level = "MODERATE"
                            else:
                                backoff_level = "NORMAL"

                            logger.debug(
                                f"Rate limiting orderbook request from {from_nick} "
                                f"(violations: {violations}, backoff: {backoff_level})"
                            )
                    return

                logger.info(
                    f"Received !orderbook request from {from_nick}, sending offers via PRIVMSG"
                )
                await self._send_offers_to_taker(from_nick)
            elif to_nick == "PUBLIC" and command.startswith("hp2"):
                # hp2 via pubmsg = commitment broadcast for blacklisting
                await self._handle_hp2_pubmsg(from_nick, command)

        except Exception as e:
            logger.error(f"Failed to handle pubmsg: {e}")

    async def _send_offers_to_taker(self, taker_nick: str) -> None:
        """Send offers to a specific taker via PRIVMSG, including fidelity bond if available.

        This is called when we receive a !orderbook request from a taker.
        According to the JoinMarket protocol, fidelity bonds must ONLY be sent
        via PRIVMSG, never in public broadcasts.

        For each offer:
        1. Format the offer parameters
        2. If we have a fidelity bond, create a proof signed for this specific taker
        3. Append !tbond <proof> to the offer data
        4. Send via PRIVMSG to the taker

        Message format:
            send_private_message(
                taker_nick,
                command="sw0reloffer",
                data="0 2500000 ... !tbond <proof>"
            )
            Results in: from_nick!taker_nick!sw0reloffer 0 2500000 ... !tbond <proof> <sig>

        Args:
            taker_nick: The nick of the taker requesting the orderbook
        """
        try:
            for offer in self.current_offers:
                # Format offer data (parameters without the command)
                order_type_str = offer.ordertype.value
                data = f"{offer.oid} {offer.minsize} {offer.maxsize} {offer.txfee} {offer.cjfee}"

                # Append fidelity bond proof if we have one
                # CRITICAL: The bond proof must be signed with the taker's nick
                if self.fidelity_bond is not None:
                    bond_proof = create_fidelity_bond_proof(
                        bond=self.fidelity_bond,
                        maker_nick=self.nick,
                        taker_nick=taker_nick,  # Sign for THIS specific taker
                        current_block_height=self.current_block_height,
                    )
                    if bond_proof:
                        data += f"!tbond {bond_proof}"
                        logger.debug(
                            f"Including fidelity bond proof in offer to {taker_nick} "
                            f"(proof length: {len(bond_proof)})"
                        )

                # Send via all connected directory clients
                for client in self.directory_clients.values():
                    try:
                        # Send as PRIVMSG
                        # Format: taker_nick!maker_nick!<order_type> <data> <signature>
                        await client.send_private_message(taker_nick, order_type_str, data)
                        logger.debug(f"Sent {order_type_str} offer to {taker_nick}")
                    except Exception as e:
                        logger.error(f"Failed to send offer to {taker_nick} via directory: {e}")

        except Exception as e:
            logger.error(f"Failed to send offers to taker {taker_nick}: {e}")

    async def _handle_privmsg(self, line: str) -> None:
        """Handle private message (CoinJoin protocol)"""
        try:
            parts = line.split(COMMAND_PREFIX)
            if len(parts) < 3:
                return

            from_nick = parts[0]
            to_nick = parts[1]
            rest = COMMAND_PREFIX.join(parts[2:])

            if to_nick != self.nick:
                return

            # Strip leading "!" if present (due to !!command message format)
            command = rest.strip().lstrip("!")

            # Note: command prefix already stripped
            if command.startswith("fill"):
                await self._handle_fill(from_nick, command)
            elif command.startswith("auth"):
                await self._handle_auth(from_nick, command)
            elif command.startswith("tx"):
                await self._handle_tx(from_nick, command)
            elif command.startswith("push"):
                await self._handle_push(from_nick, command)
            elif command.startswith("hp2"):
                # hp2 via privmsg = commitment transfer request
                # We should re-broadcast it publicly to obfuscate the source
                await self._handle_hp2_privmsg(from_nick, command)
            else:
                logger.debug(f"Unknown command: {command[:20]}...")

        except Exception as e:
            logger.error(f"Failed to handle privmsg: {e}")

    async def _handle_fill(self, taker_nick: str, msg: str) -> None:
        """Handle !fill request from taker.

        Fill message format: fill <oid> <amount> <taker_nacl_pk> <commitment> [<signing_pk> <sig>]
        """
        try:
            parts = msg.split()
            if len(parts) < 5:
                logger.warning(f"Invalid !fill format (need at least 5 parts): {msg}")
                return

            offer_id = int(parts[1])
            amount = int(parts[2])
            taker_pk = parts[3]  # Taker's NaCl pubkey for E2E encryption
            commitment = parts[4]  # PoDLE commitment (with prefix like "P")

            # Strip commitment prefix if present (e.g., "P" for standard PoDLE)
            if commitment.startswith("P"):
                commitment = commitment[1:]

            # Check if commitment is already blacklisted
            if not check_commitment(commitment):
                logger.warning(
                    f"Rejecting !fill from {taker_nick}: commitment already used "
                    f"({commitment[:16]}...)"
                )
                return

            if offer_id >= len(self.current_offers):
                logger.warning(f"Invalid offer ID: {offer_id}")
                return

            offer = self.current_offers[offer_id]

            is_valid, error = self.offer_manager.validate_offer_fill(offer, amount)
            if not is_valid:
                logger.warning(f"Invalid fill request: {error}")
                return

            session = CoinJoinSession(
                taker_nick=taker_nick,
                offer=offer,
                wallet=self.wallet,
                backend=self.backend,
                session_timeout_sec=self.config.session_timeout_sec,
                merge_algorithm=self.config.merge_algorithm.value,
            )

            # Pass the taker's NaCl pubkey for setting up encryption
            success, response = await session.handle_fill(amount, commitment, taker_pk)

            if success:
                self.active_sessions[taker_nick] = session
                logger.info(f"Created CoinJoin session with {taker_nick}")

                await self._send_response(taker_nick, "pubkey", response)
            else:
                logger.warning(f"Failed to handle fill: {response.get('error')}")

        except Exception as e:
            logger.error(f"Failed to handle !fill: {e}")

    async def _handle_auth(self, taker_nick: str, msg: str) -> None:
        """Handle !auth request from taker.

        The auth message is ENCRYPTED using NaCl.
        Format: auth <encrypted_base64> [<signing_pk> <sig>]

        After decryption, the plaintext is pipe-separated:
        txid:vout|P|P2|sig|e
        """
        try:
            if taker_nick not in self.active_sessions:
                logger.warning(f"No active session for {taker_nick}")
                return

            session = self.active_sessions[taker_nick]

            logger.info(f"Received !auth from {taker_nick}, decrypting and verifying PoDLE...")

            # Parse: auth <encrypted_base64> [<signing_pk> <sig>]
            parts = msg.split()
            if len(parts) < 2:
                logger.error("Invalid !auth format: missing encrypted data")
                return

            encrypted_data = parts[1]

            # Decrypt the auth message
            if not session.crypto.is_encrypted:
                logger.error("Encryption not set up for this session")
                return

            try:
                decrypted = session.crypto.decrypt(encrypted_data)
                logger.debug(f"Decrypted auth message length: {len(decrypted)}")
            except Exception as e:
                logger.error(f"Failed to decrypt auth message: {e}")
                return

            # Parse the decrypted revelation - pipe-separated format:
            # txid:vout|P|P2|sig|e
            try:
                revelation_parts = decrypted.split("|")
                if len(revelation_parts) != 5:
                    logger.error(
                        f"Invalid revelation format: expected 5 parts, got {len(revelation_parts)}"
                    )
                    return

                utxo_str, p_hex, p2_hex, sig_hex, e_hex = revelation_parts

                # Parse utxo
                if ":" not in utxo_str:
                    logger.error(f"Invalid utxo format: {utxo_str}")
                    return

                # Validate utxo format (txid:vout)
                if not utxo_str.rsplit(":", 1)[1].isdigit():
                    logger.error(f"Invalid vout in utxo: {utxo_str}")
                    return

                # parse_podle_revelation expects hex strings, not bytes
                revelation = {
                    "utxo": utxo_str,
                    "P": p_hex,
                    "P2": p2_hex,
                    "sig": sig_hex,
                    "e": e_hex,
                }
                logger.debug(f"Parsed revelation: utxo={utxo_str}, P={p_hex[:16]}...")
            except Exception as e:
                logger.error(f"Failed to parse revelation: {e}")
                return

            # The commitment was already stored from the !fill message
            commitment = self.active_sessions[taker_nick].commitment.hex()

            # kphex is empty for now - we don't use it yet
            kphex = ""

            success, response = await session.handle_auth(commitment, revelation, kphex)

            if success:
                await self._send_response(taker_nick, "ioauth", response)

                # Broadcast the commitment via hp2 so other makers can blacklist it
                # This prevents reuse of commitments in future CoinJoin attempts
                await self._broadcast_commitment(commitment)
            else:
                logger.error(f"Auth failed: {response.get('error')}")
                del self.active_sessions[taker_nick]

        except Exception as e:
            logger.error(f"Failed to handle !auth: {e}")

    async def _handle_tx(self, taker_nick: str, msg: str) -> None:
        """Handle !tx request from taker.

        The tx message is ENCRYPTED using NaCl.
        Format: tx <encrypted_base64> [<signing_pk> <sig>]

        After decryption, the plaintext is base64-encoded transaction bytes.
        """
        try:
            if taker_nick not in self.active_sessions:
                logger.warning(f"No active session for {taker_nick}")
                return

            session = self.active_sessions[taker_nick]

            logger.info(f"Received !tx from {taker_nick}, decrypting and verifying transaction...")

            # Parse: tx <encrypted_base64> [<signing_pk> <sig>]
            parts = msg.split()
            if len(parts) < 2:
                logger.warning("Invalid !tx format")
                return

            encrypted_data = parts[1]

            # Decrypt the tx message
            if not session.crypto.is_encrypted:
                logger.error("Encryption not set up for this session")
                return

            try:
                decrypted = session.crypto.decrypt(encrypted_data)
                logger.debug(f"Decrypted tx message length: {len(decrypted)}")
            except Exception as e:
                logger.error(f"Failed to decrypt tx message: {e}")
                return

            # The decrypted content is base64-encoded transaction
            import base64

            try:
                tx_bytes = base64.b64decode(decrypted)
                tx_hex = tx_bytes.hex()
            except Exception as e:
                logger.error(f"Failed to decode transaction: {e}")
                return

            success, response = await session.handle_tx(tx_hex)

            if success:
                # Send each signature as a separate message
                signatures = response.get("signatures", [])
                for sig in signatures:
                    await self._send_response(taker_nick, "sig", {"signature": sig})
                logger.info(f"CoinJoin with {taker_nick} COMPLETE ✓ (sent {len(signatures)} sigs)")

                # Record transaction in history
                try:
                    fee_received = session.offer.calculate_fee(session.amount)
                    txfee_contribution = session.offer.txfee
                    our_utxos = list(session.our_utxos.keys())

                    history_entry = create_maker_history_entry(
                        taker_nick=taker_nick,
                        cj_amount=session.amount,
                        fee_received=fee_received,
                        txfee_contribution=txfee_contribution,
                        cj_address=session.cj_address,
                        change_address=session.change_address,
                        our_utxos=our_utxos,
                        txid=response.get("txid"),
                        network=self.config.network.value,
                    )
                    append_history_entry(history_entry, data_dir=self.config.data_dir)
                    net = fee_received - txfee_contribution
                    logger.debug(f"Recorded CoinJoin in history: net fee {net} sats")
                except Exception as e:
                    logger.warning(f"Failed to record CoinJoin history: {e}")

                del self.active_sessions[taker_nick]

                # Schedule wallet re-sync in background to avoid blocking !push handling
                # The transaction hasn't been broadcast yet, so we should not block here
                asyncio.create_task(self._deferred_wallet_resync())
            else:
                logger.error(f"TX verification failed: {response.get('error')}")
                del self.active_sessions[taker_nick]

        except Exception as e:
            logger.error(f"Failed to handle !tx: {e}")

    async def _handle_push(self, taker_nick: str, msg: str) -> None:
        """Handle !push request from taker.

        The push message contains a base64-encoded signed transaction that the taker
        wants us to broadcast. This provides privacy benefits as the taker's IP is
        not linked to the transaction broadcast.

        Per JoinMarket protocol, makers broadcast "unquestioningly" - we already
        signed this transaction so it must be valid from our perspective. We don't
        verify or check the result, just broadcast and move on.

        Security considerations:
        - DoS risk: A malicious taker could spam !push messages with invalid data
        - Mitigation: Generic per-peer rate limiting (in directory server) prevents
          this from being a significant attack vector
        - We intentionally do NOT validate session state here to maintain protocol
          compatibility and simplicity. The rate limiter is the primary defense.

        Format: push <base64_transaction>
        """
        try:
            import base64

            parts = msg.split()
            if len(parts) < 2:
                logger.warning(f"Invalid !push format from {taker_nick}")
                return

            tx_b64 = parts[1]

            try:
                tx_bytes = base64.b64decode(tx_b64)
                tx_hex = tx_bytes.hex()
            except Exception as e:
                logger.error(f"Failed to decode !push transaction: {e}")
                return

            logger.info(f"Received !push from {taker_nick}, broadcasting transaction...")

            # Broadcast "unquestioningly" - we already signed it, so it's valid
            # from our perspective. Don't check the result.
            try:
                txid = await self.backend.broadcast_transaction(tx_hex)
                logger.info(f"Broadcast transaction for {taker_nick}: {txid}")
            except Exception as e:
                # Log but don't fail - the taker may have a fallback
                logger.warning(f"Failed to broadcast !push transaction: {e}")

        except Exception as e:
            logger.error(f"Failed to handle !push: {e}")

    async def _handle_hp2_pubmsg(self, from_nick: str, msg: str) -> None:
        """Handle !hp2 commitment broadcast seen in public channel.

        When a maker sees a PoDLE commitment broadcast in public (via !hp2),
        they should blacklist it. This prevents reuse of commitments that
        may have been used in failed or malicious CoinJoin attempts.

        Format: hp2 <commitment_hex>
        """
        try:
            parts = msg.split()
            if len(parts) < 2:
                logger.debug(f"Invalid !hp2 format from {from_nick}: missing commitment")
                return

            commitment = parts[1]

            # Add to blacklist (persists to disk)
            if add_commitment(commitment):
                logger.info(
                    f"Received commitment broadcast from {from_nick}, "
                    f"added to blacklist: {commitment[:16]}..."
                )
            else:
                logger.debug(
                    f"Received commitment broadcast from {from_nick}, "
                    f"already blacklisted: {commitment[:16]}..."
                )

        except Exception as e:
            logger.error(f"Failed to handle !hp2 pubmsg: {e}")

    async def _handle_hp2_privmsg(self, from_nick: str, msg: str) -> None:
        """Handle !hp2 commitment transfer via private message.

        When a maker receives !hp2 via privmsg, another maker is asking us to
        broadcast the commitment publicly. This provides obfuscation of the
        original source of the commitment broadcast.

        We simply re-broadcast it via pubmsg without verifying the commitment.

        Format: hp2 <commitment_hex>
        """
        try:
            parts = msg.split()
            if len(parts) < 2:
                logger.debug(f"Invalid !hp2 format from {from_nick}: missing commitment")
                return

            commitment = parts[1]
            logger.info(f"Received commitment transfer from {from_nick}, re-broadcasting...")

            # Broadcast the commitment publicly
            hp2_msg = f"hp2 {commitment}"
            for client in self.directory_clients.values():
                try:
                    await client.send_public_message(hp2_msg)
                except Exception as e:
                    logger.warning(f"Failed to broadcast hp2: {e}")

            logger.debug(f"Re-broadcast commitment: {commitment[:16]}...")

        except Exception as e:
            logger.error(f"Failed to handle !hp2 privmsg: {e}")

    async def _broadcast_commitment(self, commitment: str) -> None:
        """Broadcast a PoDLE commitment via !hp2 to help other makers blacklist it.

        After successfully processing a taker's !auth message, we broadcast the
        commitment so other makers can add it to their blacklist. This prevents
        the same commitment from being reused in future CoinJoin attempts.

        The reference implementation does this to maintain network-wide commitment
        blacklisting, which is a key anti-Sybil mechanism.
        """
        try:
            # Add to our own blacklist first (persists to disk)
            add_commitment(commitment)

            hp2_msg = f"hp2 {commitment}"
            for client in self.directory_clients.values():
                try:
                    await client.send_public_message(hp2_msg)
                except Exception as e:
                    logger.warning(f"Failed to broadcast commitment: {e}")

            logger.debug(f"Broadcast commitment: {commitment[:16]}...")

        except Exception as e:
            logger.error(f"Failed to broadcast commitment: {e}")

    async def _send_response(self, taker_nick: str, command: str, data: dict[str, Any]) -> None:
        """Send signed response to taker.

        Different commands have different formats:
        - !pubkey <nacl_pubkey_hex> - NOT encrypted
        - !ioauth <encrypted_base64> - ENCRYPTED
        - !sig <encrypted_base64> - ENCRYPTED

        The signature is appended: <message_content> <signing_pubkey> <sig_b64>
        The signature is over: <message_content> + hostid (NOT including the command!)

        For encrypted commands, the plaintext is space-separated values that get
        encrypted and base64-encoded before signing.
        """
        try:
            # Format message content based on command type
            if command == "pubkey":
                # !pubkey <nacl_pubkey_hex> [features=<comma-separated>] - NOT encrypted
                # Features are optional and backwards compatible with legacy takers
                msg_content = data["nacl_pubkey"]
                features = data.get("features", [])
                if features:
                    msg_content += f" features={','.join(features)}"
            elif command == "ioauth":
                # Plaintext format: <utxo_list> <auth_pub> <cj_addr> <change_addr> <btc_sig>
                plaintext = " ".join(
                    [
                        data["utxo_list"],
                        data["auth_pub"],
                        data["cj_addr"],
                        data["change_addr"],
                        data["btc_sig"],
                    ]
                )

                # Get the session to encrypt the message
                if taker_nick not in self.active_sessions:
                    logger.error(f"No active session for {taker_nick} to encrypt ioauth")
                    return
                session = self.active_sessions[taker_nick]
                msg_content = session.crypto.encrypt(plaintext)
                logger.debug(f"Encrypted ioauth message, plaintext_len={len(plaintext)}")
            elif command == "sig":
                # Plaintext format: <signature_base64>
                # For multiple signatures, we send them one by one
                plaintext = data["signature"]

                # Get the session to encrypt the message
                if taker_nick not in self.active_sessions:
                    logger.error(f"No active session for {taker_nick} to encrypt sig")
                    return
                session = self.active_sessions[taker_nick]
                msg_content = session.crypto.encrypt(plaintext)
                logger.debug(f"Encrypted sig: plaintext_len={len(plaintext)}")
            else:
                # Fallback to JSON for unknown commands
                msg_content = json.dumps(data)

            # Send via directory clients - they will sign the message for us
            for client in self.directory_clients.values():
                await client.send_private_message(taker_nick, command, msg_content)

            logger.debug(f"Sent signed {command} to {taker_nick}")

        except Exception as e:
            logger.error(f"Failed to send response: {e}")

    async def _on_direct_connection(self, connection: TCPConnection, peer_str: str) -> None:
        """Handle incoming direct connection from a taker via hidden service.

        Direct connections use a simplified protocol compared to directory messages:
        - Messages are sent as newline-delimited JSON over TCP
        - Format: {"nick": "sender", "cmd": "command", "data": "..."}

        This bypasses the directory server for lower latency once the taker
        knows the maker's onion address (from the peerlist).
        """
        logger.info(f"Handling direct connection from {peer_str}")

        try:
            # Keep connection open and process messages
            while self.running and connection.is_connected():
                try:
                    # Receive message with timeout
                    data = await asyncio.wait_for(connection.receive(), timeout=60.0)
                    if not data:
                        logger.info(f"Direct connection from {peer_str} closed")
                        break

                    # Parse the message
                    try:
                        message = json.loads(data.decode("utf-8"))
                    except json.JSONDecodeError as e:
                        logger.warning(f"Invalid JSON from {peer_str}: {e}")
                        continue

                    sender_nick = message.get("nick", "unknown")
                    cmd = message.get("cmd", "")
                    msg_data = message.get("data", "")

                    logger.debug(f"Direct message from {sender_nick}: cmd={cmd}")

                    # Track this connection by nick for sending responses
                    if sender_nick != "unknown":
                        self.direct_connections[sender_nick] = connection

                    # Process the command - reuse existing handlers
                    # Commands: fill, auth, tx (same as via directory)
                    full_msg = f"{cmd} {msg_data}" if msg_data else cmd

                    if cmd == "fill":
                        await self._handle_fill(sender_nick, full_msg)
                    elif cmd == "auth":
                        await self._handle_auth(sender_nick, full_msg)
                    elif cmd == "tx":
                        await self._handle_tx(sender_nick, full_msg)
                    elif cmd == "push":
                        await self._handle_push(sender_nick, full_msg)
                    else:
                        logger.debug(f"Unknown direct command from {sender_nick}: {cmd}")

                except TimeoutError:
                    # No message received, continue waiting
                    continue
                except Exception as e:
                    logger.error(f"Error processing direct message from {peer_str}: {e}")
                    break

        except Exception as e:
            logger.error(f"Error in direct connection handler for {peer_str}: {e}")
        finally:
            await connection.close()
            # Clean up nick -> connection mapping
            for nick, conn in list(self.direct_connections.items()):
                if conn == connection:
                    del self.direct_connections[nick]
            logger.info(f"Direct connection from {peer_str} closed")

Main maker bot coordinating all components.

Methods

async def start(self) ‑> None
Expand source code
async def start(self) -> None:
    """
    Start the maker bot.

    Flow:
    1. Initialize commitment blacklist
    2. Sync wallet with blockchain
    3. Create ephemeral hidden service if tor_control enabled
    4. Connect to directory servers
    5. Create and announce offers
    6. Listen for taker requests
    """
    try:
        logger.info(f"Starting maker bot (nick: {self.nick})")

        # Initialize commitment blacklist with configured data directory
        set_blacklist_path(data_dir=self.config.data_dir)

        # Load fidelity bond addresses for optimized scanning
        # We scan wallet + fidelity bonds in a single pass to avoid two separate
        # scantxoutset calls (which take ~90s each on mainnet)
        from jmcore.paths import get_default_data_dir
        from jmwallet.wallet.bond_registry import load_registry

        resolved_data_dir = (
            self.config.data_dir if self.config.data_dir else get_default_data_dir()
        )
        fidelity_bond_addresses: list[tuple[str, int, int]] = []

        # Option 1: Manual specification via fidelity_bond_index + locktimes (bypasses registry)
        # This is useful when running in Docker or when you don't have a registry yet
        if self.config.fidelity_bond_index is not None and self.config.fidelity_bond_locktimes:
            logger.info(
                f"Using manual fidelity bond specification: "
                f"index={self.config.fidelity_bond_index}, "
                f"locktimes={self.config.fidelity_bond_locktimes}"
            )
            for locktime in self.config.fidelity_bond_locktimes:
                address = self.wallet.get_fidelity_bond_address(
                    self.config.fidelity_bond_index, locktime
                )
                fidelity_bond_addresses.append(
                    (address, locktime, self.config.fidelity_bond_index)
                )
                logger.info(
                    f"Generated fidelity bond address for locktime {locktime}: {address}"
                )
        # Option 2: Load from registry (default)
        else:
            bond_registry = load_registry(resolved_data_dir)
            if bond_registry.bonds:
                # Extract (address, locktime, index) tuples from registry
                fidelity_bond_addresses = [
                    (bond.address, bond.locktime, bond.index) for bond in bond_registry.bonds
                ]
                logger.info(
                    f"Loaded {len(fidelity_bond_addresses)} "
                    f"fidelity bond address(es) from registry"
                )

        logger.info("Syncing wallet and fidelity bonds...")
        await self.wallet.sync_all(fidelity_bond_addresses)

        # Update bond registry with UTXO info from the scan (only if using registry)
        if self.config.fidelity_bond_index is None and fidelity_bond_addresses:
            from jmwallet.wallet.bond_registry import save_registry

            bond_registry = load_registry(resolved_data_dir)
            for bond in bond_registry.bonds:
                # Find the UTXO for this bond address in mixdepth 0
                bond_utxo = next(
                    (
                        utxo
                        for utxo in self.wallet.utxo_cache.get(0, [])
                        if utxo.address == bond.address
                    ),
                    None,
                )
                if bond_utxo:
                    # Update the bond registry with UTXO info
                    bond.txid = bond_utxo.txid
                    bond.vout = bond_utxo.vout
                    bond.value = bond_utxo.value
                    bond.confirmations = bond_utxo.confirmations
                    logger.debug(
                        f"Updated bond {bond.address[:20]}... with UTXO "
                        f"{bond_utxo.txid[:16]}...:{bond_utxo.vout}, value={bond_utxo.value}"
                    )

            # Save updated registry
            save_registry(bond_registry, resolved_data_dir)

        # Get current block height for bond proof generation
        self.current_block_height = await self.backend.get_block_height()
        logger.debug(f"Current block height: {self.current_block_height}")

        total_balance = await self.wallet.get_total_balance()
        logger.info(f"Wallet synced. Total balance: {total_balance:,} sats")

        # Find fidelity bond for proof generation
        # If a specific bond is selected in config, use it; otherwise use the best one
        if self.config.selected_fidelity_bond:
            # User specified a specific bond
            sel_txid, sel_vout = self.config.selected_fidelity_bond
            bonds = find_fidelity_bonds(self.wallet)
            self.fidelity_bond = next(
                (b for b in bonds if b.txid == sel_txid and b.vout == sel_vout), None
            )
            if self.fidelity_bond:
                logger.info(
                    f"Using selected fidelity bond: {sel_txid[:16]}...:{sel_vout}, "
                    f"value={self.fidelity_bond.value:,} sats, "
                    f"bond_value={self.fidelity_bond.bond_value:,}"
                )
            else:
                logger.warning(
                    f"Selected fidelity bond {sel_txid[:16]}...:{sel_vout} not found, "
                    "falling back to best available"
                )
                self.fidelity_bond = get_best_fidelity_bond(self.wallet)
        else:
            # Auto-select the best (largest bond value) fidelity bond
            self.fidelity_bond = get_best_fidelity_bond(self.wallet)
        if self.fidelity_bond:
            logger.info(
                f"Fidelity bond found: {self.fidelity_bond.txid[:16]}..., "
                f"value={self.fidelity_bond.value:,} sats, "
                f"bond_value={self.fidelity_bond.bond_value:,}"
            )
        else:
            logger.info("No fidelity bond found (offers will have no bond proof)")

        logger.info("Creating offers...")
        self.current_offers = await self.offer_manager.create_offers()

        # If no offers due to insufficient balance, wait and retry
        retry_count = 0
        max_retries = 30  # 5 minutes max wait (30 * 10s)
        while not self.current_offers and retry_count < max_retries:
            retry_count += 1
            logger.warning(
                f"No offers created (insufficient balance?). "
                f"Waiting 10s and retrying... (attempt {retry_count}/{max_retries})"
            )
            await asyncio.sleep(10)

            # Re-sync wallet to check for new funds
            await self.wallet.sync_all()
            total_balance = await self.wallet.get_total_balance()
            logger.info(f"Wallet re-synced. Total balance: {total_balance:,} sats")

            self.current_offers = await self.offer_manager.create_offers()

        if not self.current_offers:
            logger.error(
                f"No offers created after {max_retries} retries. "
                "Please fund the wallet and restart."
            )
            return

        # Set up ephemeral hidden service via Tor control port if enabled
        # This must happen before connecting to directory servers so we can
        # advertise the onion address
        ephemeral_onion = await self._setup_tor_hidden_service()
        if ephemeral_onion:
            # Override onion_host with the dynamically created one
            object.__setattr__(self.config, "onion_host", ephemeral_onion)
            logger.info(f"Using ephemeral onion address: {ephemeral_onion}")

        # Determine the onion address to advertise
        onion_host = self.config.onion_host

        logger.info("Connecting to directory servers...")
        for dir_server in self.config.directory_servers:
            try:
                parts = dir_server.split(":")
                host = parts[0]
                port = int(parts[1]) if len(parts) > 1 else 5222

                # Determine location for handshake:
                # If we have an onion_host configured (static or ephemeral),
                # advertise it with port
                # Otherwise, use NOT-SERVING-ONION
                if onion_host:
                    location = f"{onion_host}:{self.config.onion_serving_port}"
                else:
                    location = "NOT-SERVING-ONION"

                # Advertise neutrino_compat if our backend can provide extended UTXO metadata.
                # This tells Neutrino takers that we can provide scriptpubkey and blockheight.
                # Full nodes (Bitcoin Core) can provide this; light clients (Neutrino) cannot.
                neutrino_compat = self.backend.can_provide_neutrino_metadata()

                # Create DirectoryClient with SOCKS config for Tor connections
                client = DirectoryClient(
                    host=host,
                    port=port,
                    network=self.config.network.value,
                    nick_identity=self.nick_identity,
                    location=location,
                    socks_host=self.config.socks_host,
                    socks_port=self.config.socks_port,
                    neutrino_compat=neutrino_compat,
                )

                await client.connect()
                node_id = f"{host}:{port}"
                self.directory_clients[node_id] = client

                logger.info(f"Connected to directory: {dir_server}")

            except Exception as e:
                logger.error(f"Failed to connect to {dir_server}: {e}")

        if not self.directory_clients:
            logger.error("Failed to connect to any directory server")
            return

        # Start hidden service listener if we have an onion address (static or ephemeral)
        if onion_host:
            logger.info(
                f"Starting hidden service listener on "
                f"{self.config.onion_serving_host}:{self.config.onion_serving_port}..."
            )
            self.hidden_service_listener = HiddenServiceListener(
                host=self.config.onion_serving_host,
                port=self.config.onion_serving_port,
                on_connection=self._on_direct_connection,
            )
            await self.hidden_service_listener.start()
            logger.info(f"Hidden service listener started (onion: {onion_host})")

        logger.info("Announcing offers...")
        await self._announce_offers()

        logger.info("Maker bot started. Listening for takers...")
        self.running = True

        # Start listening on all directory clients
        for node_id, client in self.directory_clients.items():
            task = asyncio.create_task(self._listen_client(node_id, client))
            self.listen_tasks.append(task)

        # If hidden service listener is running, start serve_forever task
        if self.hidden_service_listener:
            task = asyncio.create_task(self.hidden_service_listener.serve_forever())
            self.listen_tasks.append(task)

        # Start background task to monitor pending transactions
        monitor_task = asyncio.create_task(self._monitor_pending_transactions())
        self.listen_tasks.append(monitor_task)

        # Start periodic wallet rescan task
        rescan_task = asyncio.create_task(self._periodic_rescan())
        self.listen_tasks.append(rescan_task)

        # Start periodic rate limit status logging task
        status_task = asyncio.create_task(self._periodic_rate_limit_status())
        self.listen_tasks.append(status_task)

        # Start periodic directory connection status logging task
        conn_status_task = asyncio.create_task(self._periodic_directory_connection_status())
        self.listen_tasks.append(conn_status_task)

        # Wait for all listening tasks to complete
        await asyncio.gather(*self.listen_tasks, return_exceptions=True)

    except Exception as e:
        logger.error(f"Failed to start maker bot: {e}")
        raise

Start the maker bot.

Flow: 1. Initialize commitment blacklist 2. Sync wallet with blockchain 3. Create ephemeral hidden service if tor_control enabled 4. Connect to directory servers 5. Create and announce offers 6. Listen for taker requests

async def stop(self) ‑> None
Expand source code
async def stop(self) -> None:
    """Stop the maker bot"""
    logger.info("Stopping maker bot...")
    self.running = False

    # Cancel all listening tasks
    for task in self.listen_tasks:
        task.cancel()

    if self.listen_tasks:
        await asyncio.gather(*self.listen_tasks, return_exceptions=True)

    # Stop hidden service listener
    if self.hidden_service_listener:
        await self.hidden_service_listener.stop()

    # Clean up Tor control connection (ephemeral hidden service auto-removed)
    await self._cleanup_tor_hidden_service()

    # Close all direct connections
    for conn in self.direct_connections.values():
        try:
            await conn.close()
        except Exception:
            pass
    self.direct_connections.clear()

    # Close all directory clients
    for client in self.directory_clients.values():
        try:
            await client.close()
        except Exception:
            pass

    await self.wallet.close()
    logger.info("Maker bot stopped")

Stop the maker bot

class MakerConfig (**data: Any)
Expand source code
class MakerConfig(WalletConfig):
    """
    Configuration for maker bot.

    Inherits base wallet configuration from jmcore.config.WalletConfig
    and adds maker-specific settings for offers, hidden services, and
    UTXO selection.
    """

    # Hidden service configuration for direct peer connections
    # If onion_host is set, maker will serve on a hidden service
    # If tor_control is enabled and onion_host is None, it will be auto-generated
    onion_host: str | None = Field(
        default=None, description="Hidden service address (e.g., 'mymaker...onion')"
    )
    onion_serving_host: str = Field(
        default="127.0.0.1", description="Local address Tor forwards to"
    )
    onion_serving_port: int = Field(
        default=5222, ge=0, le=65535, description="Default JoinMarket port (0 = auto-assign)"
    )

    # Tor control port configuration for dynamic hidden service creation
    tor_control: TorControlConfig = Field(
        default_factory=TorControlConfig,
        description="Tor control port configuration",
    )

    # Offer configuration
    offer_type: OfferType = Field(
        default=OfferType.SW0_RELATIVE, description="Offer type (relative/absolute fee)"
    )
    min_size: int = Field(default=100_000, ge=0, description="Minimum CoinJoin amount in satoshis")
    cj_fee_relative: str = Field(default="0.001", description="Relative CJ fee (0.001 = 0.1%)")
    cj_fee_absolute: int = Field(default=500, ge=0, description="Absolute CJ fee in satoshis")
    tx_fee_contribution: int = Field(
        default=0, ge=0, description="Transaction fee contribution in satoshis"
    )

    # Minimum confirmations for UTXOs
    min_confirmations: int = Field(default=1, ge=0, description="Minimum confirmations for UTXOs")

    # Fidelity bond configuration
    # List of locktimes (Unix timestamps) to scan for fidelity bonds
    # These should match locktimes used when creating bond UTXOs
    fidelity_bond_locktimes: list[int] = Field(
        default_factory=list, description="List of locktimes to scan for fidelity bonds"
    )

    # Manual fidelity bond specification (bypasses registry)
    # Use this when you don't have a registry or want to specify a bond directly
    fidelity_bond_index: int | None = Field(
        default=None, description="Fidelity bond derivation index (bypasses registry)"
    )

    # Selected fidelity bond (txid, vout) - if not set, largest bond is used automatically
    selected_fidelity_bond: tuple[str, int] | None = Field(
        default=None, description="Selected fidelity bond UTXO (txid, vout)"
    )

    # Timeouts
    session_timeout_sec: int = Field(
        default=300,
        ge=60,
        description="Maximum time for a CoinJoin session to complete (all states)",
    )

    # Wallet rescan configuration
    post_coinjoin_rescan_delay: int = Field(
        default=60,
        ge=5,
        description="Seconds to wait before rescanning wallet after CoinJoin completion",
    )
    rescan_interval_sec: int = Field(
        default=600,
        ge=60,
        description="Interval in seconds for periodic wallet rescans (default: 10 minutes)",
    )

    # UTXO merge algorithm - how many UTXOs to use
    merge_algorithm: MergeAlgorithm = Field(
        default=MergeAlgorithm.DEFAULT,
        description=(
            "UTXO selection strategy: default (minimum), gradual (+1), "
            "greedy (all), random (0-2 extra)"
        ),
    )

    # Generic message rate limiting (protects against spam/DoS)
    message_rate_limit: int = Field(
        default=10,
        ge=1,
        description="Maximum messages per second per peer (sustained)",
    )
    message_burst_limit: int = Field(
        default=100,
        ge=1,
        description="Maximum burst messages per peer (default: 100, allows ~10s at max rate)",
    )

    # Rate limiting for orderbook requests (protects against spam attacks)
    orderbook_rate_limit: int = Field(
        default=1,
        ge=1,
        description="Maximum orderbook responses per peer per interval",
    )
    orderbook_rate_interval: float = Field(
        default=10.0,
        ge=1.0,
        description="Interval in seconds for orderbook rate limiting (default: 10s)",
    )
    orderbook_violation_ban_threshold: int = Field(
        default=100,
        ge=1,
        description="Ban peer after this many rate limit violations",
    )
    orderbook_violation_warning_threshold: int = Field(
        default=10,
        ge=1,
        description="Start exponential backoff after this many violations",
    )
    orderbook_violation_severe_threshold: int = Field(
        default=50,
        ge=1,
        description="Severe backoff threshold (higher penalty)",
    )
    orderbook_ban_duration: float = Field(
        default=3600.0,
        ge=60.0,
        description="Ban duration in seconds (default: 1 hour)",
    )

    model_config = {"frozen": False}

    @model_validator(mode="after")
    def validate_config(self) -> MakerConfig:
        """Validate configuration after initialization."""
        # Set bitcoin_network default (handled by parent WalletConfig)
        if self.bitcoin_network is None:
            object.__setattr__(self, "bitcoin_network", self.network)

        # Validate cj_fee_relative for relative offer types
        if self.offer_type in (OfferType.SW0_RELATIVE, OfferType.SWA_RELATIVE):
            try:
                cj_fee_float = float(self.cj_fee_relative)
                if cj_fee_float <= 0:
                    raise ValueError(
                        f"cj_fee_relative must be > 0 for relative offer types, "
                        f"got {self.cj_fee_relative}"
                    )
            except ValueError as e:
                if "could not convert" in str(e):
                    raise ValueError(
                        f"cj_fee_relative must be a valid number, got {self.cj_fee_relative}"
                    ) from e
                raise

        return self

Configuration for maker bot.

Inherits base wallet configuration from jmcore.config.WalletConfig and adds maker-specific settings for offers, hidden services, and UTXO selection.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

Class variables

var cj_fee_absolute : int

The type of the None singleton.

var cj_fee_relative : str

The type of the None singleton.

var fidelity_bond_index : int | None

The type of the None singleton.

var fidelity_bond_locktimes : list[int]

The type of the None singleton.

var merge_algorithmMergeAlgorithm

The type of the None singleton.

var message_burst_limit : int

The type of the None singleton.

var message_rate_limit : int

The type of the None singleton.

var min_confirmations : int

The type of the None singleton.

var min_size : int

The type of the None singleton.

var offer_typeOfferType

The type of the None singleton.

var onion_host : str | None

The type of the None singleton.

var onion_serving_host : str

The type of the None singleton.

var onion_serving_port : int

The type of the None singleton.

var orderbook_ban_duration : float

The type of the None singleton.

var orderbook_rate_interval : float

The type of the None singleton.

var orderbook_rate_limit : int

The type of the None singleton.

var orderbook_violation_ban_threshold : int

The type of the None singleton.

var orderbook_violation_severe_threshold : int

The type of the None singleton.

var orderbook_violation_warning_threshold : int

The type of the None singleton.

var post_coinjoin_rescan_delay : int

The type of the None singleton.

var rescan_interval_sec : int

The type of the None singleton.

var selected_fidelity_bond : tuple[str, int] | None

The type of the None singleton.

var session_timeout_sec : int

The type of the None singleton.

var tor_controlTorControlConfig

The type of the None singleton.

var tx_fee_contribution : int

The type of the None singleton.

Methods

def validate_config(self) ‑> MakerConfig
Expand source code
@model_validator(mode="after")
def validate_config(self) -> MakerConfig:
    """Validate configuration after initialization."""
    # Set bitcoin_network default (handled by parent WalletConfig)
    if self.bitcoin_network is None:
        object.__setattr__(self, "bitcoin_network", self.network)

    # Validate cj_fee_relative for relative offer types
    if self.offer_type in (OfferType.SW0_RELATIVE, OfferType.SWA_RELATIVE):
        try:
            cj_fee_float = float(self.cj_fee_relative)
            if cj_fee_float <= 0:
                raise ValueError(
                    f"cj_fee_relative must be > 0 for relative offer types, "
                    f"got {self.cj_fee_relative}"
                )
        except ValueError as e:
            if "could not convert" in str(e):
                raise ValueError(
                    f"cj_fee_relative must be a valid number, got {self.cj_fee_relative}"
                ) from e
            raise

    return self

Validate configuration after initialization.

Inherited members