Module jmwallet.backends

Blockchain backend implementations.

Available backends: - BitcoinCoreBackend: Full node via Bitcoin Core RPC (no wallet, uses scantxoutset) - DescriptorWalletBackend: Full node with descriptor wallet (uses importdescriptors + listunspent) - NeutrinoBackend: Lightweight BIP157/BIP158 SPV client - MempoolBackend: Mempool.space API (third-party, no setup required)

Backend Selection Guide: - DescriptorWalletBackend (Recommended): Fastest for ongoing operations. Uses Bitcoin Core's descriptor wallet feature to track UTXOs automatically. Requires one-time descriptor import. - BitcoinCoreBackend: No wallet setup needed, but slower due to full UTXO set scans via scantxoutset. Good for simple scripts or one-off operations. - NeutrinoBackend: Lightweight client for limited storage environments.

Neutrino Compatibility: All backends support verify_utxo_with_metadata() for Neutrino-compatible UTXO verification. Check backend.requires_neutrino_metadata() to determine if the backend needs scriptPubKey/blockheight hints from peers.

Sub-modules

jmwallet.backends.base

Base blockchain backend interface.

jmwallet.backends.bitcoin_core

Bitcoin Core RPC blockchain backend. Uses RPC calls but NOT wallet functionality (no BDB dependency).

jmwallet.backends.descriptor_wallet

Bitcoin Core Descriptor Wallet backend …

jmwallet.backends.mempool

Mempool.space API blockchain backend. Beginner-friendly backend that requires no setup.

jmwallet.backends.neutrino

Neutrino (BIP157/BIP158) light client blockchain backend …

Functions

def generate_wallet_name(mnemonic_fingerprint: str, network: str = 'mainnet') ‑> str
Expand source code
def generate_wallet_name(mnemonic_fingerprint: str, network: str = "mainnet") -> str:
    """
    Generate a deterministic wallet name from mnemonic fingerprint.

    This ensures the same mnemonic always uses the same wallet, avoiding
    duplicate wallet creation.

    Args:
        mnemonic_fingerprint: First 8 chars of SHA256(mnemonic)
        network: Network name (mainnet, testnet, regtest)

    Returns:
        Wallet name like "jm_abc12345_mainnet"
    """
    return f"jm_{mnemonic_fingerprint}_{network}"

Generate a deterministic wallet name from mnemonic fingerprint.

This ensures the same mnemonic always uses the same wallet, avoiding duplicate wallet creation.

Args

mnemonic_fingerprint
First 8 chars of SHA256(mnemonic)
network
Network name (mainnet, testnet, regtest)

Returns

Wallet name like "jm_abc12345_mainnet"

def get_mnemonic_fingerprint(mnemonic: str, passphrase: str = '') ‑> str
Expand source code
def get_mnemonic_fingerprint(mnemonic: str, passphrase: str = "") -> str:
    """
    Get BIP32 master key fingerprint from mnemonic (like SeedSigner).

    This creates the master HD key from the seed and derives m/0 to get
    the fingerprint, following the same approach as SeedSigner and other
    Bitcoin wallet software.

    Args:
        mnemonic: BIP39 mnemonic phrase
        passphrase: Optional BIP39 passphrase (13th/25th word)

    Returns:
        8-character hex string (4 bytes) of the m/0 fingerprint
    """
    from jmwallet.wallet.bip32 import HDKey, mnemonic_to_seed

    # Convert mnemonic to seed bytes
    seed = mnemonic_to_seed(mnemonic, passphrase)

    # Create master HD key from seed
    root = HDKey.from_seed(seed)

    # Derive m/0 child key (following SeedSigner approach)
    child = root.derive("m/0")

    # Get fingerprint (4 bytes)
    fingerprint_bytes = child.fingerprint

    # Convert to 8-character hex string
    return fingerprint_bytes.hex()

Get BIP32 master key fingerprint from mnemonic (like SeedSigner).

This creates the master HD key from the seed and derives m/0 to get the fingerprint, following the same approach as SeedSigner and other Bitcoin wallet software.

Args

mnemonic
BIP39 mnemonic phrase
passphrase
Optional BIP39 passphrase (13th/25th word)

Returns

8-character hex string (4 bytes) of the m/0 fingerprint

Classes

class BitcoinCoreBackend (rpc_url: str = 'http://127.0.0.1:18443',
rpc_user: str = 'rpcuser',
rpc_password: str = 'rpcpassword',
scan_timeout: float = 300.0)
Expand source code
class BitcoinCoreBackend(BlockchainBackend):
    """
    Blockchain backend using Bitcoin Core RPC.
    Does NOT use Bitcoin Core wallet (avoids BDB issues).
    Uses scantxoutset and other non-wallet RPC methods.
    """

    def __init__(
        self,
        rpc_url: str = "http://127.0.0.1:18443",
        rpc_user: str = "rpcuser",
        rpc_password: str = "rpcpassword",
        scan_timeout: float = SCAN_RPC_TIMEOUT,
    ):
        self.rpc_url = rpc_url.rstrip("/")
        self.rpc_user = rpc_user
        self.rpc_password = rpc_password
        self.scan_timeout = scan_timeout
        # Client for regular RPC calls
        self.client = httpx.AsyncClient(timeout=DEFAULT_RPC_TIMEOUT, auth=(rpc_user, rpc_password))
        # Separate client for long-running scans
        self._scan_client = httpx.AsyncClient(timeout=scan_timeout, auth=(rpc_user, rpc_password))
        self._request_id = 0

    async def _rpc_call(
        self,
        method: str,
        params: list | None = None,
        client: httpx.AsyncClient | None = None,
    ) -> Any:
        """
        Make an RPC call to Bitcoin Core.

        Args:
            method: RPC method name
            params: Method parameters
            client: Optional httpx client (uses default client if not provided)

        Returns:
            RPC result

        Raises:
            ValueError: On RPC errors
            httpx.HTTPError: On connection/timeout errors
        """
        self._request_id += 1
        payload = {
            "jsonrpc": "2.0",
            "id": self._request_id,
            "method": method,
            "params": params or [],
        }

        use_client = client or self.client

        try:
            response = await use_client.post(self.rpc_url, json=payload)
            response.raise_for_status()
            data = response.json()

            if "error" in data and data["error"]:
                error_info = data["error"]
                error_code = error_info.get("code", "unknown")
                error_msg = error_info.get("message", str(error_info))
                raise ValueError(f"RPC error {error_code}: {error_msg}")

            return data.get("result")

        except httpx.TimeoutException as e:
            logger.error(f"RPC call timed out: {method} - {e}")
            raise
        except httpx.HTTPError as e:
            logger.error(f"RPC call failed: {method} - {e}")
            raise

    async def _scantxoutset_with_retry(
        self, descriptors: Sequence[str | dict[str, Any]]
    ) -> dict[str, Any] | None:
        """
        Execute scantxoutset with retry logic for handling concurrent scan conflicts.

        Bitcoin Core only allows one scantxoutset at a time. This method:
        1. Checks if a scan is already in progress
        2. If so, waits for it to complete (via status polling) before starting ours
        3. Starts our scan with extended timeout for mainnet

        Args:
            descriptors: List of output descriptors to scan for. Can be:
                - Simple strings: "addr(bc1q...)"
                - Dicts with range: {"desc": "wpkh([fp/84'/0'/0'/0/*)", "range": [0, 999]}

        Returns:
            Scan result dict or None if all retries failed
        """
        for attempt in range(SCAN_MAX_RETRIES):
            try:
                # First check if a scan is already running
                status = await self._rpc_call("scantxoutset", ["status"])
                if status is not None:
                    # A scan is in progress - wait for it
                    # Bitcoin Core returns progress as 0-100, not 0-1
                    progress = status.get("progress", 0) / 100.0
                    logger.debug(
                        f"Another scan in progress ({progress:.1%}), waiting... "
                        f"(attempt {attempt + 1}/{SCAN_MAX_RETRIES})"
                    )
                    if attempt < SCAN_MAX_RETRIES - 1:
                        await asyncio.sleep(SCAN_STATUS_POLL_INTERVAL)
                        continue

                # Start our scan with extended timeout
                logger.debug(f"Starting UTXO scan for {len(descriptors)} descriptor(s)...")
                if SENSITIVE_LOGGING:
                    logger.debug(f"Descriptors for scan: {descriptors}")
                result = await self._rpc_call(
                    "scantxoutset", ["start", descriptors], client=self._scan_client
                )
                if result:
                    unspent_count = len(result.get("unspents", []))
                    total_amount = result.get("total_amount", 0)
                    logger.debug(
                        f"Scan completed: found {unspent_count} UTXOs, total {total_amount:.8f} BTC"
                    )
                    if SENSITIVE_LOGGING and unspent_count > 0:
                        logger.debug(f"Scan result: {result}")
                return result

            except ValueError as e:
                error_str = str(e)
                # Check for "scan already in progress" error (code -8)
                if "code': -8" in error_str or "Scan already in progress" in error_str:
                    if attempt < SCAN_MAX_RETRIES - 1:
                        delay = SCAN_BASE_DELAY * (2**attempt) + random.uniform(0, 0.5)
                        logger.debug(
                            f"Scan in progress (RPC error), retrying in {delay:.2f}s "
                            f"(attempt {attempt + 1}/{SCAN_MAX_RETRIES})"
                        )
                        await asyncio.sleep(delay)
                        continue
                    else:
                        logger.warning(
                            f"Max retries ({SCAN_MAX_RETRIES}) exceeded waiting for scan slot"
                        )
                        return None
                else:
                    # Other RPC errors - log and re-raise
                    logger.error(f"scantxoutset RPC error: {error_str}")
                    raise

            except httpx.TimeoutException:
                # Timeout during scan - this is a real failure on mainnet
                logger.error(
                    f"scantxoutset timed out after {self.scan_timeout}s. "
                    "Try increasing scan_timeout for mainnet."
                )
                return None

            except Exception as e:
                logger.error(f"Unexpected error during scantxoutset: {type(e).__name__}: {e}")
                raise

        logger.warning(f"scantxoutset failed after {SCAN_MAX_RETRIES} attempts")
        return None

    async def get_utxos(self, addresses: list[str]) -> list[UTXO]:
        utxos: list[UTXO] = []
        if not addresses:
            return utxos

        # Get tip height once for confirmation calculation
        try:
            tip_height = await self.get_block_height()
        except Exception as e:
            logger.error(f"Failed to get block height for UTXO scan: {e}")
            return utxos

        # Process in batches to avoid huge RPC requests
        batch_size: int = 1000
        for i in range(0, len(addresses), batch_size):
            chunk = addresses[i : i + batch_size]
            descriptors = [f"addr({addr})" for addr in chunk]
            if SENSITIVE_LOGGING:
                logger.debug(f"Scanning addresses batch {i // batch_size + 1}: {chunk}")

            try:
                # Scan for all addresses in this chunk at once (with retry for conflicts)
                result = await self._scantxoutset_with_retry(descriptors)

                if not result or "unspents" not in result:
                    continue

                for utxo_data in result["unspents"]:
                    confirmations = 0
                    if utxo_data.get("height", 0) > 0:
                        confirmations = tip_height - utxo_data["height"] + 1

                    # Extract address from descriptor "addr(ADDRESS)#checksum" or "addr(ADDRESS)"
                    desc = utxo_data.get("desc", "")
                    # Remove checksum if present
                    if "#" in desc:
                        desc = desc.split("#")[0]

                    address = ""
                    if desc.startswith("addr(") and desc.endswith(")"):
                        address = desc[5:-1]
                    else:
                        # Only log warning if we really can't parse it (and it's not empty)
                        if desc:
                            logger.warning(f"Failed to parse address from descriptor: '{desc}'")

                    utxo = UTXO(
                        txid=utxo_data["txid"],
                        vout=utxo_data["vout"],
                        value=btc_to_sats(utxo_data["amount"]),
                        address=address,
                        confirmations=confirmations,
                        scriptpubkey=utxo_data.get("scriptPubKey", ""),
                        height=utxo_data.get("height"),
                    )
                    utxos.append(utxo)

                logger.debug(
                    f"Scanned {len(chunk)} addresses, found {len(result['unspents'])} UTXOs"
                )

            except Exception as e:
                logger.warning(f"Failed to scan UTXOs for batch starting {chunk[0]}: {e}")
                continue

        return utxos

    async def scan_descriptors(
        self, descriptors: Sequence[str | dict[str, Any]]
    ) -> dict[str, Any] | None:
        """
        Scan the UTXO set using output descriptors.

        This is much more efficient than scanning individual addresses,
        especially for HD wallets where you can use xpub descriptors with
        ranges to scan thousands of addresses in a single UTXO set pass.

        Example descriptors:
            - "addr(bc1q...)" - single address
            - "wpkh(xpub.../0/*)" - HD wallet external addresses (default range 0-1000)
            - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range

        Args:
            descriptors: List of output descriptors (strings or dicts with range)

        Returns:
            Raw scan result dict from Bitcoin Core, or None on failure.
            Result includes:
                - success: bool
                - txouts: number of UTXOs scanned
                - height: current block height
                - unspents: list of found UTXOs with txid, vout, scriptPubKey,
                            desc (matched descriptor), amount, height
                - total_amount: sum of all found UTXOs
        """
        if not descriptors:
            return {"success": True, "unspents": [], "total_amount": 0}

        logger.info(f"Starting descriptor scan with {len(descriptors)} descriptor(s)...")
        result = await self._scantxoutset_with_retry(descriptors)

        if result:
            unspent_count = len(result.get("unspents", []))
            total = result.get("total_amount", 0)
            logger.info(
                f"Descriptor scan complete: found {unspent_count} UTXOs, total {total:.8f} BTC"
            )
        else:
            logger.warning("Descriptor scan failed or returned no results")

        return result

    async def get_address_balance(self, address: str) -> int:
        utxos = await self.get_utxos([address])
        balance = sum(utxo.value for utxo in utxos)
        logger.debug(f"Balance for {address}: {balance} sats")
        return balance

    async def broadcast_transaction(self, tx_hex: str) -> str:
        try:
            txid = await self._rpc_call("sendrawtransaction", [tx_hex])
            logger.info(f"Broadcast transaction: {txid}")
            return txid

        except Exception as e:
            logger.error(f"Failed to broadcast transaction: {e}")
            raise ValueError(f"Broadcast failed: {e}") from e

    async def get_transaction(self, txid: str) -> Transaction | None:
        try:
            tx_data = await self._rpc_call("getrawtransaction", [txid, True])

            if not tx_data:
                return None

            confirmations = tx_data.get("confirmations", 0)
            block_height = None
            block_time = None

            if "blockhash" in tx_data:
                block_info = await self._rpc_call("getblockheader", [tx_data["blockhash"]])
                block_height = block_info.get("height")
                block_time = block_info.get("time")

            raw_hex = tx_data.get("hex", "")

            return Transaction(
                txid=txid,
                raw=raw_hex,
                confirmations=confirmations,
                block_height=block_height,
                block_time=block_time,
            )

        except Exception as e:
            logger.warning(f"Failed to fetch transaction {txid}: {e}")
            return None

    async def estimate_fee(self, target_blocks: int) -> float:
        try:
            result = await self._rpc_call("estimatesmartfee", [target_blocks])

            if "feerate" in result:
                btc_per_kb = result["feerate"]
                # Convert BTC/kB to sat/vB (keep precision for sub-sat rates)
                sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
                logger.debug(f"Estimated fee for {target_blocks} blocks: {sat_per_vbyte} sat/vB")
                return sat_per_vbyte
            else:
                logger.warning("Fee estimation unavailable, using fallback")
                return 1.0

        except Exception as e:
            logger.warning(f"Failed to estimate fee: {e}, using fallback")
            return 1.0

    async def get_mempool_min_fee(self) -> float | None:
        """Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool.

        Returns:
            Minimum fee rate in sat/vB, or None if unavailable.
        """
        try:
            result = await self._rpc_call("getmempoolinfo", [])
            if "mempoolminfee" in result:
                btc_per_kb = result["mempoolminfee"]
                # Convert BTC/kB to sat/vB
                sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
                logger.debug(f"Mempool min fee: {sat_per_vbyte} sat/vB")
                return sat_per_vbyte
            return None
        except Exception as e:
            logger.debug(f"Failed to get mempool min fee: {e}")
            return None

    async def get_block_height(self) -> int:
        try:
            info = await self._rpc_call("getblockchaininfo", [])
            height = info.get("blocks", 0)
            logger.debug(f"Current block height: {height}")
            return height

        except Exception as e:
            logger.error(f"Failed to fetch block height: {e}")
            raise

    async def get_block_time(self, block_height: int) -> int:
        try:
            block_hash = await self.get_block_hash(block_height)
            block_header = await self._rpc_call("getblockheader", [block_hash])
            timestamp = block_header.get("time", 0)
            logger.debug(f"Block {block_height} timestamp: {timestamp}")
            return timestamp

        except Exception as e:
            logger.error(f"Failed to fetch block time for height {block_height}: {e}")
            raise

    async def get_block_hash(self, block_height: int) -> str:
        try:
            block_hash = await self._rpc_call("getblockhash", [block_height])
            logger.debug(f"Block hash for height {block_height}: {block_hash}")
            return block_hash

        except Exception as e:
            logger.error(f"Failed to fetch block hash for height {block_height}: {e}")
            raise

    async def get_utxo(self, txid: str, vout: int) -> UTXO | None:
        """Get a specific UTXO from the blockchain UTXO set using gettxout.
        Returns None if the UTXO does not exist or has been spent.

        If not found in confirmed UTXO set, checks mempool for unconfirmed transactions.
        """
        try:
            # gettxout returns None if UTXO doesn't exist or is spent
            # include_mempool=True checks both confirmed and unconfirmed outputs
            result = await self._rpc_call("gettxout", [txid, vout, True])

            if result is None:
                # Not found in UTXO set - check if it's in mempool (unconfirmed)
                logger.debug(
                    f"UTXO {txid}:{vout} not found in confirmed UTXO set, checking mempool..."
                )
                try:
                    # Get raw transaction from mempool
                    tx_data = await self._rpc_call("getrawtransaction", [txid, True])

                    if tx_data and "vout" in tx_data:
                        # Check if the vout exists and hasn't been spent
                        if vout < len(tx_data["vout"]):
                            vout_data = tx_data["vout"][vout]
                            value = btc_to_sats(vout_data.get("value", 0))

                            # Extract address from scriptPubKey
                            script_pub_key = vout_data.get("scriptPubKey", {})
                            address = script_pub_key.get("address", "")
                            # For multiple addresses (e.g., multisig), join them
                            if not address and "addresses" in script_pub_key:
                                addresses = script_pub_key.get("addresses", [])
                                address = addresses[0] if addresses else ""
                            scriptpubkey = script_pub_key.get("hex", "")

                            # Unconfirmed transaction has 0 confirmations
                            logger.info(f"Found UTXO {txid}:{vout} in mempool (unconfirmed)")
                            return UTXO(
                                txid=txid,
                                vout=vout,
                                value=value,
                                address=address,
                                confirmations=0,
                                scriptpubkey=scriptpubkey,
                                height=None,
                            )
                except Exception as mempool_err:
                    logger.debug(f"UTXO {txid}:{vout} not in mempool either: {mempool_err}")

                logger.debug(f"UTXO {txid}:{vout} not found (spent or doesn't exist)")
                return None

            # Get tip height for confirmation calculation
            tip_height = await self.get_block_height()

            confirmations = result.get("confirmations", 0)
            value = btc_to_sats(result.get("value", 0))  # BTC to sats

            # Extract address from scriptPubKey
            script_pub_key = result.get("scriptPubKey", {})
            address = script_pub_key.get("address", "")
            scriptpubkey = script_pub_key.get("hex", "")

            # Calculate height from confirmations
            height = None
            if confirmations > 0:
                height = tip_height - confirmations + 1

            return UTXO(
                txid=txid,
                vout=vout,
                value=value,
                address=address,
                confirmations=confirmations,
                scriptpubkey=scriptpubkey,
                height=height,
            )

        except Exception as e:
            logger.error(f"Failed to get UTXO {txid}:{vout}: {e}")
            return None

    def can_provide_neutrino_metadata(self) -> bool:
        """
        Bitcoin Core can provide Neutrino-compatible metadata.

        Full node can access scriptpubkey and blockheight for all UTXOs,
        allowing Neutrino takers to use our makers.

        Returns:
            True - Bitcoin Core always provides extended UTXO metadata
        """
        return True

    async def close(self) -> None:
        await self.client.aclose()
        await self._scan_client.aclose()

Blockchain backend using Bitcoin Core RPC. Does NOT use Bitcoin Core wallet (avoids BDB issues). Uses scantxoutset and other non-wallet RPC methods.

Ancestors

Methods

def can_provide_neutrino_metadata(self) ‑> bool
Expand source code
def can_provide_neutrino_metadata(self) -> bool:
    """
    Bitcoin Core can provide Neutrino-compatible metadata.

    Full node can access scriptpubkey and blockheight for all UTXOs,
    allowing Neutrino takers to use our makers.

    Returns:
        True - Bitcoin Core always provides extended UTXO metadata
    """
    return True

Bitcoin Core can provide Neutrino-compatible metadata.

Full node can access scriptpubkey and blockheight for all UTXOs, allowing Neutrino takers to use our makers.

Returns

True - Bitcoin Core always provides extended UTXO metadata

async def get_mempool_min_fee(self) ‑> float | None
Expand source code
async def get_mempool_min_fee(self) -> float | None:
    """Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool.

    Returns:
        Minimum fee rate in sat/vB, or None if unavailable.
    """
    try:
        result = await self._rpc_call("getmempoolinfo", [])
        if "mempoolminfee" in result:
            btc_per_kb = result["mempoolminfee"]
            # Convert BTC/kB to sat/vB
            sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
            logger.debug(f"Mempool min fee: {sat_per_vbyte} sat/vB")
            return sat_per_vbyte
        return None
    except Exception as e:
        logger.debug(f"Failed to get mempool min fee: {e}")
        return None

Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool.

Returns

Minimum fee rate in sat/vB, or None if unavailable.

async def get_utxo(self, txid: str, vout: int) ‑> UTXO | None
Expand source code
async def get_utxo(self, txid: str, vout: int) -> UTXO | None:
    """Get a specific UTXO from the blockchain UTXO set using gettxout.
    Returns None if the UTXO does not exist or has been spent.

    If not found in confirmed UTXO set, checks mempool for unconfirmed transactions.
    """
    try:
        # gettxout returns None if UTXO doesn't exist or is spent
        # include_mempool=True checks both confirmed and unconfirmed outputs
        result = await self._rpc_call("gettxout", [txid, vout, True])

        if result is None:
            # Not found in UTXO set - check if it's in mempool (unconfirmed)
            logger.debug(
                f"UTXO {txid}:{vout} not found in confirmed UTXO set, checking mempool..."
            )
            try:
                # Get raw transaction from mempool
                tx_data = await self._rpc_call("getrawtransaction", [txid, True])

                if tx_data and "vout" in tx_data:
                    # Check if the vout exists and hasn't been spent
                    if vout < len(tx_data["vout"]):
                        vout_data = tx_data["vout"][vout]
                        value = btc_to_sats(vout_data.get("value", 0))

                        # Extract address from scriptPubKey
                        script_pub_key = vout_data.get("scriptPubKey", {})
                        address = script_pub_key.get("address", "")
                        # For multiple addresses (e.g., multisig), join them
                        if not address and "addresses" in script_pub_key:
                            addresses = script_pub_key.get("addresses", [])
                            address = addresses[0] if addresses else ""
                        scriptpubkey = script_pub_key.get("hex", "")

                        # Unconfirmed transaction has 0 confirmations
                        logger.info(f"Found UTXO {txid}:{vout} in mempool (unconfirmed)")
                        return UTXO(
                            txid=txid,
                            vout=vout,
                            value=value,
                            address=address,
                            confirmations=0,
                            scriptpubkey=scriptpubkey,
                            height=None,
                        )
            except Exception as mempool_err:
                logger.debug(f"UTXO {txid}:{vout} not in mempool either: {mempool_err}")

            logger.debug(f"UTXO {txid}:{vout} not found (spent or doesn't exist)")
            return None

        # Get tip height for confirmation calculation
        tip_height = await self.get_block_height()

        confirmations = result.get("confirmations", 0)
        value = btc_to_sats(result.get("value", 0))  # BTC to sats

        # Extract address from scriptPubKey
        script_pub_key = result.get("scriptPubKey", {})
        address = script_pub_key.get("address", "")
        scriptpubkey = script_pub_key.get("hex", "")

        # Calculate height from confirmations
        height = None
        if confirmations > 0:
            height = tip_height - confirmations + 1

        return UTXO(
            txid=txid,
            vout=vout,
            value=value,
            address=address,
            confirmations=confirmations,
            scriptpubkey=scriptpubkey,
            height=height,
        )

    except Exception as e:
        logger.error(f"Failed to get UTXO {txid}:{vout}: {e}")
        return None

Get a specific UTXO from the blockchain UTXO set using gettxout. Returns None if the UTXO does not exist or has been spent.

If not found in confirmed UTXO set, checks mempool for unconfirmed transactions.

async def scan_descriptors(self, descriptors: Sequence[str | dict[str, Any]]) ‑> dict[str, typing.Any] | None
Expand source code
async def scan_descriptors(
    self, descriptors: Sequence[str | dict[str, Any]]
) -> dict[str, Any] | None:
    """
    Scan the UTXO set using output descriptors.

    This is much more efficient than scanning individual addresses,
    especially for HD wallets where you can use xpub descriptors with
    ranges to scan thousands of addresses in a single UTXO set pass.

    Example descriptors:
        - "addr(bc1q...)" - single address
        - "wpkh(xpub.../0/*)" - HD wallet external addresses (default range 0-1000)
        - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range

    Args:
        descriptors: List of output descriptors (strings or dicts with range)

    Returns:
        Raw scan result dict from Bitcoin Core, or None on failure.
        Result includes:
            - success: bool
            - txouts: number of UTXOs scanned
            - height: current block height
            - unspents: list of found UTXOs with txid, vout, scriptPubKey,
                        desc (matched descriptor), amount, height
            - total_amount: sum of all found UTXOs
    """
    if not descriptors:
        return {"success": True, "unspents": [], "total_amount": 0}

    logger.info(f"Starting descriptor scan with {len(descriptors)} descriptor(s)...")
    result = await self._scantxoutset_with_retry(descriptors)

    if result:
        unspent_count = len(result.get("unspents", []))
        total = result.get("total_amount", 0)
        logger.info(
            f"Descriptor scan complete: found {unspent_count} UTXOs, total {total:.8f} BTC"
        )
    else:
        logger.warning("Descriptor scan failed or returned no results")

    return result

Scan the UTXO set using output descriptors.

This is much more efficient than scanning individual addresses, especially for HD wallets where you can use xpub descriptors with ranges to scan thousands of addresses in a single UTXO set pass.

Example descriptors: - "addr(bc1q…)" - single address - "wpkh(xpub…/0/)" - HD wallet external addresses (default range 0-1000) - {"desc": "wpkh(xpub…/0/)", "range": [0, 999]} - explicit range

Args

descriptors
List of output descriptors (strings or dicts with range)

Returns

Raw scan result dict from Bitcoin Core, or None on failure. Result includes: - success: bool - txouts: number of UTXOs scanned - height: current block height - unspents: list of found UTXOs with txid, vout, scriptPubKey, desc (matched descriptor), amount, height - total_amount: sum of all found UTXOs

Inherited members

class BlockchainBackend
Expand source code
class BlockchainBackend(ABC):
    """
    Abstract blockchain backend interface.
    Implementations provide access to blockchain data without requiring
    Bitcoin Core wallet functionality (avoiding BerkeleyDB issues).
    """

    @abstractmethod
    async def get_utxos(self, addresses: list[str]) -> list[UTXO]:
        """Get UTXOs for given addresses"""

    @abstractmethod
    async def get_address_balance(self, address: str) -> int:
        """Get balance for an address in satoshis"""

    @abstractmethod
    async def broadcast_transaction(self, tx_hex: str) -> str:
        """Broadcast transaction, returns txid"""

    @abstractmethod
    async def get_transaction(self, txid: str) -> Transaction | None:
        """Get transaction by txid"""

    @abstractmethod
    async def estimate_fee(self, target_blocks: int) -> float:
        """Estimate fee in sat/vbyte for target confirmation blocks.

        Returns:
            Fee rate in sat/vB. Can be fractional (e.g., 0.5 sat/vB).
        """

    async def get_mempool_min_fee(self) -> float | None:
        """Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool.

        This is used as a floor for fee estimation to ensure transactions are
        relayed and accepted into the mempool. Returns None if not supported
        or unavailable (e.g., light clients).

        Returns:
            Minimum fee rate in sat/vB, or None if unavailable.
        """
        return None

    def can_estimate_fee(self) -> bool:
        """Check if this backend can perform fee estimation.

        Full node backends (Bitcoin Core) can estimate fees.
        Light client backends (Neutrino) typically cannot.

        Returns:
            True if backend supports fee estimation, False otherwise.
        """
        return True

    def has_mempool_access(self) -> bool:
        """Check if this backend can access unconfirmed transactions in the mempool.

        Full node backends (Bitcoin Core) and API backends (Mempool.space) have
        mempool access and can verify transactions immediately after broadcast.

        Light client backends (Neutrino using BIP157/158) cannot access the mempool
        and can only see transactions after they're confirmed in a block. This
        affects broadcast verification strategy - see BroadcastPolicy docs.

        Returns:
            True if backend can see unconfirmed transactions, False otherwise.
        """
        return True

    @abstractmethod
    async def get_block_height(self) -> int:
        """Get current blockchain height"""

    @abstractmethod
    async def get_block_time(self, block_height: int) -> int:
        """Get block time (unix timestamp) for given height"""

    @abstractmethod
    async def get_block_hash(self, block_height: int) -> str:
        """Get block hash for given height"""

    @abstractmethod
    async def get_utxo(self, txid: str, vout: int) -> UTXO | None:
        """Get a specific UTXO from the blockchain UTXO set (gettxout).
        Returns None if the UTXO does not exist or has been spent."""

    async def scan_descriptors(
        self, descriptors: Sequence[str | dict[str, Any]]
    ) -> dict[str, Any] | None:
        """
        Scan the UTXO set using output descriptors.

        This is an efficient alternative to scanning individual addresses,
        especially useful for HD wallets where xpub descriptors with ranges
        can scan thousands of addresses in a single UTXO set pass.

        Example descriptors:
            - "addr(bc1q...)" - single address
            - "wpkh(xpub.../0/*)" - HD wallet addresses (default range 0-1000)
            - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range

        Args:
            descriptors: List of output descriptors (strings or dicts with range)

        Returns:
            Scan result dict with:
                - success: bool
                - unspents: list of found UTXOs
                - total_amount: sum of all found UTXOs
            Returns None if not supported or on failure.

        Note:
            Not all backends support descriptor scanning. The default implementation
            returns None. Override in backends that support it (e.g., Bitcoin Core).
        """
        # Default: not supported
        return None

    async def verify_utxo_with_metadata(
        self,
        txid: str,
        vout: int,
        scriptpubkey: str,
        blockheight: int,
    ) -> UTXOVerificationResult:
        """
        Verify a UTXO using provided metadata (neutrino_compat feature).

        This method allows light clients to verify UTXOs without needing
        arbitrary blockchain queries by using metadata provided by the peer.

        The implementation should:
        1. Use scriptpubkey to add the UTXO to watch list (for Neutrino)
        2. Use blockheight as a hint for efficient rescan
        3. Verify the UTXO exists with matching scriptpubkey
        4. Return the UTXO value and confirmations

        Default implementation falls back to get_utxo() for full node backends.

        Args:
            txid: Transaction ID
            vout: Output index
            scriptpubkey: Expected scriptPubKey (hex)
            blockheight: Block height where UTXO was confirmed

        Returns:
            UTXOVerificationResult with verification status and UTXO data
        """
        # Default implementation for full node backends
        # Just uses get_utxo() directly since we can query any UTXO
        utxo = await self.get_utxo(txid, vout)

        if utxo is None:
            return UTXOVerificationResult(
                valid=False,
                error="UTXO not found or spent",
            )

        # Verify scriptpubkey matches
        scriptpubkey_matches = utxo.scriptpubkey.lower() == scriptpubkey.lower()

        if not scriptpubkey_matches:
            return UTXOVerificationResult(
                valid=False,
                value=utxo.value,
                confirmations=utxo.confirmations,
                error="ScriptPubKey mismatch",
                scriptpubkey_matches=False,
            )

        return UTXOVerificationResult(
            valid=True,
            value=utxo.value,
            confirmations=utxo.confirmations,
            scriptpubkey_matches=True,
        )

    def requires_neutrino_metadata(self) -> bool:
        """
        Check if this backend requires Neutrino-compatible metadata for UTXO verification.

        Full node backends can verify any UTXO directly.
        Light client backends need scriptpubkey and blockheight hints.

        Returns:
            True if backend requires metadata for verification
        """
        return False

    def can_provide_neutrino_metadata(self) -> bool:
        """
        Check if this backend can provide Neutrino-compatible metadata to peers.

        This determines whether to advertise neutrino_compat feature to the network.
        Backends should return True if they can provide extended UTXO format with
        scriptpubkey and blockheight fields.

        Full node backends (Bitcoin Core) can provide this metadata.
        Light client backends (Neutrino) typically cannot reliably provide it for all UTXOs.

        Returns:
            True if backend can provide scriptpubkey and blockheight for its UTXOs
        """
        # Default: Full nodes can provide metadata, light clients cannot
        return not self.requires_neutrino_metadata()

    async def verify_tx_output(
        self,
        txid: str,
        vout: int,
        address: str,
        start_height: int | None = None,
    ) -> bool:
        """
        Verify that a specific transaction output exists (was broadcast and confirmed).

        This is useful for verifying a transaction was successfully broadcast when
        we know at least one of its output addresses (e.g., our coinjoin destination).

        For full node backends, this uses get_transaction().
        For light clients (neutrino), this uses UTXO lookup with the address hint.

        Args:
            txid: Transaction ID to verify
            vout: Output index to check
            address: The address that should own this output
            start_height: Optional block height hint for light clients (improves performance)

        Returns:
            True if the output exists (transaction was broadcast), False otherwise
        """
        # Default implementation for full node backends
        tx = await self.get_transaction(txid)
        return tx is not None

    async def close(self) -> None:
        """Close backend connection"""
        pass

Abstract blockchain backend interface. Implementations provide access to blockchain data without requiring Bitcoin Core wallet functionality (avoiding BerkeleyDB issues).

Ancestors

  • abc.ABC

Subclasses

Methods

async def broadcast_transaction(self, tx_hex: str) ‑> str
Expand source code
@abstractmethod
async def broadcast_transaction(self, tx_hex: str) -> str:
    """Broadcast transaction, returns txid"""

Broadcast transaction, returns txid

def can_estimate_fee(self) ‑> bool
Expand source code
def can_estimate_fee(self) -> bool:
    """Check if this backend can perform fee estimation.

    Full node backends (Bitcoin Core) can estimate fees.
    Light client backends (Neutrino) typically cannot.

    Returns:
        True if backend supports fee estimation, False otherwise.
    """
    return True

Check if this backend can perform fee estimation.

Full node backends (Bitcoin Core) can estimate fees. Light client backends (Neutrino) typically cannot.

Returns

True if backend supports fee estimation, False otherwise.

def can_provide_neutrino_metadata(self) ‑> bool
Expand source code
def can_provide_neutrino_metadata(self) -> bool:
    """
    Check if this backend can provide Neutrino-compatible metadata to peers.

    This determines whether to advertise neutrino_compat feature to the network.
    Backends should return True if they can provide extended UTXO format with
    scriptpubkey and blockheight fields.

    Full node backends (Bitcoin Core) can provide this metadata.
    Light client backends (Neutrino) typically cannot reliably provide it for all UTXOs.

    Returns:
        True if backend can provide scriptpubkey and blockheight for its UTXOs
    """
    # Default: Full nodes can provide metadata, light clients cannot
    return not self.requires_neutrino_metadata()

Check if this backend can provide Neutrino-compatible metadata to peers.

This determines whether to advertise neutrino_compat feature to the network. Backends should return True if they can provide extended UTXO format with scriptpubkey and blockheight fields.

Full node backends (Bitcoin Core) can provide this metadata. Light client backends (Neutrino) typically cannot reliably provide it for all UTXOs.

Returns

True if backend can provide scriptpubkey and blockheight for its UTXOs

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close backend connection"""
    pass

Close backend connection

async def estimate_fee(self, target_blocks: int) ‑> float
Expand source code
@abstractmethod
async def estimate_fee(self, target_blocks: int) -> float:
    """Estimate fee in sat/vbyte for target confirmation blocks.

    Returns:
        Fee rate in sat/vB. Can be fractional (e.g., 0.5 sat/vB).
    """

Estimate fee in sat/vbyte for target confirmation blocks.

Returns

Fee rate in sat/vB. Can be fractional (e.g., 0.5 sat/vB).

async def get_address_balance(self, address: str) ‑> int
Expand source code
@abstractmethod
async def get_address_balance(self, address: str) -> int:
    """Get balance for an address in satoshis"""

Get balance for an address in satoshis

async def get_block_hash(self, block_height: int) ‑> str
Expand source code
@abstractmethod
async def get_block_hash(self, block_height: int) -> str:
    """Get block hash for given height"""

Get block hash for given height

async def get_block_height(self) ‑> int
Expand source code
@abstractmethod
async def get_block_height(self) -> int:
    """Get current blockchain height"""

Get current blockchain height

async def get_block_time(self, block_height: int) ‑> int
Expand source code
@abstractmethod
async def get_block_time(self, block_height: int) -> int:
    """Get block time (unix timestamp) for given height"""

Get block time (unix timestamp) for given height

async def get_mempool_min_fee(self) ‑> float | None
Expand source code
async def get_mempool_min_fee(self) -> float | None:
    """Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool.

    This is used as a floor for fee estimation to ensure transactions are
    relayed and accepted into the mempool. Returns None if not supported
    or unavailable (e.g., light clients).

    Returns:
        Minimum fee rate in sat/vB, or None if unavailable.
    """
    return None

Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool.

This is used as a floor for fee estimation to ensure transactions are relayed and accepted into the mempool. Returns None if not supported or unavailable (e.g., light clients).

Returns

Minimum fee rate in sat/vB, or None if unavailable.

async def get_transaction(self, txid: str) ‑> Transaction | None
Expand source code
@abstractmethod
async def get_transaction(self, txid: str) -> Transaction | None:
    """Get transaction by txid"""

Get transaction by txid

async def get_utxo(self, txid: str, vout: int) ‑> UTXO | None
Expand source code
@abstractmethod
async def get_utxo(self, txid: str, vout: int) -> UTXO | None:
    """Get a specific UTXO from the blockchain UTXO set (gettxout).
    Returns None if the UTXO does not exist or has been spent."""

Get a specific UTXO from the blockchain UTXO set (gettxout). Returns None if the UTXO does not exist or has been spent.

async def get_utxos(self, addresses: list[str]) ‑> list[UTXO]
Expand source code
@abstractmethod
async def get_utxos(self, addresses: list[str]) -> list[UTXO]:
    """Get UTXOs for given addresses"""

Get UTXOs for given addresses

def has_mempool_access(self) ‑> bool
Expand source code
def has_mempool_access(self) -> bool:
    """Check if this backend can access unconfirmed transactions in the mempool.

    Full node backends (Bitcoin Core) and API backends (Mempool.space) have
    mempool access and can verify transactions immediately after broadcast.

    Light client backends (Neutrino using BIP157/158) cannot access the mempool
    and can only see transactions after they're confirmed in a block. This
    affects broadcast verification strategy - see BroadcastPolicy docs.

    Returns:
        True if backend can see unconfirmed transactions, False otherwise.
    """
    return True

Check if this backend can access unconfirmed transactions in the mempool.

Full node backends (Bitcoin Core) and API backends (Mempool.space) have mempool access and can verify transactions immediately after broadcast.

Light client backends (Neutrino using BIP157/158) cannot access the mempool and can only see transactions after they're confirmed in a block. This affects broadcast verification strategy - see BroadcastPolicy docs.

Returns

True if backend can see unconfirmed transactions, False otherwise.

def requires_neutrino_metadata(self) ‑> bool
Expand source code
def requires_neutrino_metadata(self) -> bool:
    """
    Check if this backend requires Neutrino-compatible metadata for UTXO verification.

    Full node backends can verify any UTXO directly.
    Light client backends need scriptpubkey and blockheight hints.

    Returns:
        True if backend requires metadata for verification
    """
    return False

Check if this backend requires Neutrino-compatible metadata for UTXO verification.

Full node backends can verify any UTXO directly. Light client backends need scriptpubkey and blockheight hints.

Returns

True if backend requires metadata for verification

async def scan_descriptors(self, descriptors: Sequence[str | dict[str, Any]]) ‑> dict[str, typing.Any] | None
Expand source code
async def scan_descriptors(
    self, descriptors: Sequence[str | dict[str, Any]]
) -> dict[str, Any] | None:
    """
    Scan the UTXO set using output descriptors.

    This is an efficient alternative to scanning individual addresses,
    especially useful for HD wallets where xpub descriptors with ranges
    can scan thousands of addresses in a single UTXO set pass.

    Example descriptors:
        - "addr(bc1q...)" - single address
        - "wpkh(xpub.../0/*)" - HD wallet addresses (default range 0-1000)
        - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range

    Args:
        descriptors: List of output descriptors (strings or dicts with range)

    Returns:
        Scan result dict with:
            - success: bool
            - unspents: list of found UTXOs
            - total_amount: sum of all found UTXOs
        Returns None if not supported or on failure.

    Note:
        Not all backends support descriptor scanning. The default implementation
        returns None. Override in backends that support it (e.g., Bitcoin Core).
    """
    # Default: not supported
    return None

Scan the UTXO set using output descriptors.

This is an efficient alternative to scanning individual addresses, especially useful for HD wallets where xpub descriptors with ranges can scan thousands of addresses in a single UTXO set pass.

Example descriptors: - "addr(bc1q…)" - single address - "wpkh(xpub…/0/)" - HD wallet addresses (default range 0-1000) - {"desc": "wpkh(xpub…/0/)", "range": [0, 999]} - explicit range

Args

descriptors
List of output descriptors (strings or dicts with range)

Returns

Scan result dict with: - success: bool - unspents: list of found UTXOs - total_amount: sum of all found UTXOs Returns None if not supported or on failure.

Note

Not all backends support descriptor scanning. The default implementation returns None. Override in backends that support it (e.g., Bitcoin Core).

async def verify_tx_output(self, txid: str, vout: int, address: str, start_height: int | None = None) ‑> bool
Expand source code
async def verify_tx_output(
    self,
    txid: str,
    vout: int,
    address: str,
    start_height: int | None = None,
) -> bool:
    """
    Verify that a specific transaction output exists (was broadcast and confirmed).

    This is useful for verifying a transaction was successfully broadcast when
    we know at least one of its output addresses (e.g., our coinjoin destination).

    For full node backends, this uses get_transaction().
    For light clients (neutrino), this uses UTXO lookup with the address hint.

    Args:
        txid: Transaction ID to verify
        vout: Output index to check
        address: The address that should own this output
        start_height: Optional block height hint for light clients (improves performance)

    Returns:
        True if the output exists (transaction was broadcast), False otherwise
    """
    # Default implementation for full node backends
    tx = await self.get_transaction(txid)
    return tx is not None

Verify that a specific transaction output exists (was broadcast and confirmed).

This is useful for verifying a transaction was successfully broadcast when we know at least one of its output addresses (e.g., our coinjoin destination).

For full node backends, this uses get_transaction(). For light clients (neutrino), this uses UTXO lookup with the address hint.

Args

txid
Transaction ID to verify
vout
Output index to check
address
The address that should own this output
start_height
Optional block height hint for light clients (improves performance)

Returns

True if the output exists (transaction was broadcast), False otherwise

async def verify_utxo_with_metadata(self, txid: str, vout: int, scriptpubkey: str, blockheight: int) ‑> UTXOVerificationResult
Expand source code
async def verify_utxo_with_metadata(
    self,
    txid: str,
    vout: int,
    scriptpubkey: str,
    blockheight: int,
) -> UTXOVerificationResult:
    """
    Verify a UTXO using provided metadata (neutrino_compat feature).

    This method allows light clients to verify UTXOs without needing
    arbitrary blockchain queries by using metadata provided by the peer.

    The implementation should:
    1. Use scriptpubkey to add the UTXO to watch list (for Neutrino)
    2. Use blockheight as a hint for efficient rescan
    3. Verify the UTXO exists with matching scriptpubkey
    4. Return the UTXO value and confirmations

    Default implementation falls back to get_utxo() for full node backends.

    Args:
        txid: Transaction ID
        vout: Output index
        scriptpubkey: Expected scriptPubKey (hex)
        blockheight: Block height where UTXO was confirmed

    Returns:
        UTXOVerificationResult with verification status and UTXO data
    """
    # Default implementation for full node backends
    # Just uses get_utxo() directly since we can query any UTXO
    utxo = await self.get_utxo(txid, vout)

    if utxo is None:
        return UTXOVerificationResult(
            valid=False,
            error="UTXO not found or spent",
        )

    # Verify scriptpubkey matches
    scriptpubkey_matches = utxo.scriptpubkey.lower() == scriptpubkey.lower()

    if not scriptpubkey_matches:
        return UTXOVerificationResult(
            valid=False,
            value=utxo.value,
            confirmations=utxo.confirmations,
            error="ScriptPubKey mismatch",
            scriptpubkey_matches=False,
        )

    return UTXOVerificationResult(
        valid=True,
        value=utxo.value,
        confirmations=utxo.confirmations,
        scriptpubkey_matches=True,
    )

Verify a UTXO using provided metadata (neutrino_compat feature).

This method allows light clients to verify UTXOs without needing arbitrary blockchain queries by using metadata provided by the peer.

The implementation should: 1. Use scriptpubkey to add the UTXO to watch list (for Neutrino) 2. Use blockheight as a hint for efficient rescan 3. Verify the UTXO exists with matching scriptpubkey 4. Return the UTXO value and confirmations

Default implementation falls back to get_utxo() for full node backends.

Args

txid
Transaction ID
vout
Output index
scriptpubkey
Expected scriptPubKey (hex)
blockheight
Block height where UTXO was confirmed

Returns

UTXOVerificationResult with verification status and UTXO data

class DescriptorWalletBackend (rpc_url: str = 'http://127.0.0.1:18443',
rpc_user: str = 'rpcuser',
rpc_password: str = 'rpcpassword',
wallet_name: str = 'jm_descriptor_wallet',
import_timeout: float = 120.0)
Expand source code
class DescriptorWalletBackend(BlockchainBackend):
    """
    Blockchain backend using Bitcoin Core descriptor wallets.

    This backend creates and manages a descriptor wallet in Bitcoin Core,
    importing xpub descriptors for efficient UTXO tracking. Once imported,
    Bitcoin Core automatically tracks UTXOs and provides fast queries via listunspent.

    Usage:
        backend = DescriptorWalletBackend(
            rpc_url="http://127.0.0.1:8332",
            rpc_user="user",
            rpc_password="pass",
            wallet_name="jm_wallet",
        )

        # Setup wallet and import descriptors (one-time or on startup)
        await backend.setup_wallet(descriptors)

        # Fast UTXO queries - no more full UTXO set scans
        utxos = await backend.get_utxos(addresses)
    """

    def __init__(
        self,
        rpc_url: str = "http://127.0.0.1:18443",
        rpc_user: str = "rpcuser",
        rpc_password: str = "rpcpassword",
        wallet_name: str = "jm_descriptor_wallet",
        import_timeout: float = IMPORT_RPC_TIMEOUT,
    ):
        """
        Initialize descriptor wallet backend.

        Args:
            rpc_url: Bitcoin Core RPC URL
            rpc_user: RPC username
            rpc_password: RPC password
            wallet_name: Name for the descriptor wallet in Bitcoin Core
            import_timeout: Timeout for descriptor import operations
        """
        self.rpc_url = rpc_url.rstrip("/")
        self.rpc_user = rpc_user
        self.rpc_password = rpc_password
        self.wallet_name = wallet_name
        self.import_timeout = import_timeout

        logger.info(f"Initialized DescriptorWalletBackend with wallet: {wallet_name}")

        # Client for regular RPC calls
        self.client = httpx.AsyncClient(timeout=DEFAULT_RPC_TIMEOUT, auth=(rpc_user, rpc_password))
        # Client for long-running import operations
        self._import_client = httpx.AsyncClient(
            timeout=import_timeout, auth=(rpc_user, rpc_password)
        )
        self._request_id = 0

        # Track if wallet is setup
        self._wallet_loaded = False
        self._descriptors_imported = False

        # Track background rescan status
        self._background_rescan_height: int | None = None

    def _get_wallet_url(self) -> str:
        """Get the RPC URL for wallet-specific calls."""
        return f"{self.rpc_url}/wallet/{self.wallet_name}"

    async def _rpc_call(
        self,
        method: str,
        params: list | None = None,
        client: httpx.AsyncClient | None = None,
        use_wallet: bool = True,
    ) -> Any:
        """
        Make an RPC call to Bitcoin Core.

        Args:
            method: RPC method name
            params: Method parameters
            client: Optional httpx client (uses default client if not provided)
            use_wallet: If True, use wallet-specific URL

        Returns:
            RPC result

        Raises:
            ValueError: On RPC errors
            httpx.HTTPError: On connection/timeout errors
        """
        self._request_id += 1
        payload = {
            "jsonrpc": "2.0",
            "id": self._request_id,
            "method": method,
            "params": params or [],
        }

        use_client = client or self.client
        url = self._get_wallet_url() if use_wallet and self._wallet_loaded else self.rpc_url

        try:
            response = await use_client.post(url, json=payload)

            # Try to parse JSON response even if status code indicates error
            # Bitcoin Core may return 500 with valid JSON-RPC error details
            try:
                data = response.json()
            except Exception:
                # If JSON parsing fails, raise HTTP error
                response.raise_for_status()
                raise

            if "error" in data and data["error"]:
                error_info = data["error"]
                error_code = error_info.get("code", "unknown")
                error_msg = error_info.get("message", str(error_info))
                raise ValueError(f"RPC error {error_code}: {error_msg}")

            # Check HTTP status only after verifying no RPC error in response
            response.raise_for_status()

            return data.get("result")

        except httpx.TimeoutException as e:
            logger.error(f"RPC call timed out: {method} - {e}")
            raise
        except ValueError:
            # Re-raise ValueError (RPC errors) as-is
            raise
        except httpx.HTTPError as e:
            logger.error(f"RPC call failed: {method} - {e}")
            raise

    async def create_wallet(self, disable_private_keys: bool = True) -> bool:
        """
        Create a descriptor wallet in Bitcoin Core.

        The wallet is encrypted with the passphrase (if provided) to protect
        the xpubs from unauthorized access. This is important because xpubs
        reveal transaction history, which would undo the privacy benefits
        of CoinJoin if exposed.

        Args:
            disable_private_keys: If True, creates a watch-only wallet (recommended)

        Returns:
            True if wallet was created or already exists
        """
        try:
            # First check if wallet already exists
            wallets = await self._rpc_call("listwallets", use_wallet=False)
            if self.wallet_name in wallets:
                logger.info(f"Wallet '{self.wallet_name}' already loaded")
                self._wallet_loaded = True
                return True

            # Try to load existing wallet
            try:
                await self._rpc_call("loadwallet", [self.wallet_name], use_wallet=False)
                logger.info(f"Loaded existing wallet '{self.wallet_name}'")
                self._wallet_loaded = True
                return True
            except ValueError as e:
                error_str = str(e).lower()
                # RPC error -18 is "Wallet not found" or "Path does not exist"
                not_found_errs = ("not found", "does not exist", "-18")
                if not any(err in error_str for err in not_found_errs):
                    raise

            # Create new descriptor wallet (watch-only, no private keys)
            # Params: wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors
            result = await self._rpc_call(
                "createwallet",
                [
                    self.wallet_name,  # wallet_name
                    disable_private_keys,  # disable_private_keys
                    True,  # blank (no default keys)
                    "",  # passphrase (empty - not supported for watch-only wallets)
                    False,  # avoid_reuse
                    True,  # descriptors (MUST be True for descriptor wallet)
                ],
                use_wallet=False,
            )

            logger.info(f"Created descriptor wallet '{self.wallet_name}': {result}")
            self._wallet_loaded = True
            return True

        except Exception as e:
            logger.error(f"Failed to create/load wallet: {e}")
            raise

    async def _get_smart_scan_timestamp(
        self, lookback_blocks: int = DEFAULT_SCAN_LOOKBACK_BLOCKS
    ) -> int:
        """
        Calculate a smart scan timestamp based on current block height.

        Returns a Unix timestamp corresponding to approximately `lookback_blocks` ago.
        This allows scanning recent history quickly without waiting for a full
        genesis-to-tip rescan.

        Args:
            lookback_blocks: Number of blocks to look back (default: ~1 year)

        Returns:
            Unix timestamp for the target block
        """
        try:
            # Get current block height
            current_height = await self.get_block_height()

            # Calculate target height (don't go below 0)
            target_height = max(0, current_height - lookback_blocks)

            # Get block time at target height
            block_hash = await self.get_block_hash(target_height)
            block_header = await self._rpc_call("getblockheader", [block_hash], use_wallet=False)
            timestamp = block_header.get("time", 0)

            logger.debug(
                f"Smart scan: current height {current_height}, "
                f"target height {target_height}, timestamp {timestamp}"
            )
            return timestamp

        except Exception as e:
            logger.warning(f"Failed to calculate smart scan timestamp: {e}, falling back to 0")
            return 0

    async def import_descriptors(
        self,
        descriptors: Sequence[str | dict[str, Any]],
        rescan: bool = True,
        timestamp: str | int | None = None,
        smart_scan: bool = True,
        background_full_rescan: bool = True,
    ) -> dict[str, Any]:
        """
        Import descriptors into the wallet.

        This is the key operation that enables efficient UTXO tracking. Once imported,
        Bitcoin Core will automatically track all addresses derived from these descriptors.

        Smart Scan Behavior (smart_scan=True):
            Instead of scanning from genesis (which can take 20+ minutes on mainnet),
            the smart scan imports descriptors with a timestamp ~1 year in the past.
            This allows quick startup while still catching most wallet activity.

            If background_full_rescan=True, a full rescan from genesis is triggered
            in the background after the initial import completes. This runs asynchronously
            and ensures no transactions are missed.

        Args:
            descriptors: List of output descriptors. Can be:
                - Simple strings: "wpkh(xpub.../0/*)"
                - Dicts with range:
                  {"desc": "wpkh(xpub.../0/*)", "range": [0, DEFAULT_GAP_LIMIT - 1]}
            rescan: If True, rescan blockchain (behavior depends on smart_scan).
                   If False, only track new transactions (timestamp="now").
            timestamp: Override timestamp. If None, uses smart calculation or 0/"now".
                      Can be Unix timestamp for partial rescan from specific time.
            smart_scan: If True and rescan=True, scan from ~1 year ago instead of genesis.
                       This allows quick startup. (default: True)
            background_full_rescan: If True and smart_scan=True, trigger full rescan
                                   from genesis in background after import. (default: True)

        Returns:
            Import result from Bitcoin Core with additional 'background_rescan_started' key

        Example:
            # Smart scan (fast startup, background full rescan)
            await backend.import_descriptors([
                {
                    "desc": "wpkh(xpub.../0/*)",
                    "range": [0, DEFAULT_GAP_LIMIT - 1],
                    "internal": False,
                },
            ], rescan=True, smart_scan=True)

            # Full rescan from genesis (slow but complete)
            await backend.import_descriptors([...], rescan=True, smart_scan=False)

            # No rescan (for brand new wallets with no history)
            await backend.import_descriptors([...], rescan=False)
        """
        if not self._wallet_loaded:
            raise RuntimeError("Wallet not loaded. Call create_wallet() first.")

        # Calculate appropriate timestamp
        background_rescan_needed = False
        if timestamp is None:
            if not rescan:
                timestamp = "now"
            elif smart_scan:
                # Smart scan: start from ~1 year ago for fast startup
                timestamp = await self._get_smart_scan_timestamp()
                background_rescan_needed = background_full_rescan
            else:
                # Full rescan from genesis
                timestamp = 0

        # Format descriptors for importdescriptors RPC
        import_requests = []
        for desc in descriptors:
            if isinstance(desc, str):
                # Add checksum if not present
                desc_with_checksum = await self._add_descriptor_checksum(desc)
                # Single address descriptors (addr(...)) cannot be active - they're not ranged
                is_ranged = "*" in desc or "range" in desc if isinstance(desc, str) else False
                import_requests.append(
                    {
                        "desc": desc_with_checksum,
                        "timestamp": timestamp,
                        "active": is_ranged,  # Only ranged descriptors can be active
                        "internal": False,
                    }
                )
            elif isinstance(desc, dict):
                desc_str = desc.get("desc", "")
                desc_with_checksum = await self._add_descriptor_checksum(desc_str)
                # Determine if descriptor is ranged (has * wildcard or explicit range)
                is_ranged = "*" in desc_str or "range" in desc
                request = {
                    "desc": desc_with_checksum,
                    "timestamp": timestamp,
                    "active": is_ranged,  # Only ranged descriptors can be active
                }
                if "range" in desc:
                    request["range"] = desc["range"]
                if "internal" in desc:
                    request["internal"] = desc["internal"]
                import_requests.append(request)

        if SENSITIVE_LOGGING:
            logger.debug(f"Importing {len(import_requests)} descriptor(s): {import_requests}")
        else:
            if timestamp == 0:
                rescan_info = "from genesis (timestamp=0)"
            elif timestamp == "now":
                rescan_info = "no rescan (timestamp='now')"
            elif smart_scan and background_rescan_needed:
                rescan_info = (
                    f"smart scan from ~1 year ago (timestamp={timestamp}), "
                    "full rescan in background"
                )
            else:
                rescan_info = f"timestamp={timestamp}"
            logger.info(
                f"Importing {len(import_requests)} descriptor(s) into wallet ({rescan_info})..."
            )

        try:
            result = await self._rpc_call(
                "importdescriptors", [import_requests], client=self._import_client
            )

            # Check for errors in results
            success_count = sum(1 for r in result if r.get("success", False))
            error_count = len(result) - success_count

            if error_count > 0:
                errors = [
                    r.get("error", {}).get("message", "unknown")
                    for r in result
                    if not r.get("success", False)
                ]
                logger.warning(f"Import completed with {error_count} error(s): {errors}")
                # Log full results for debugging
                for i, r in enumerate(result):
                    if not r.get("success", False):
                        logger.debug(f"  Descriptor {i} failed: {r}")
            else:
                logger.info(f"Successfully imported {success_count} descriptor(s)")

            # Verify import by listing descriptors
            try:
                verify_result = await self._rpc_call("listdescriptors")
                actual_count = len(verify_result.get("descriptors", []))
                logger.debug(f"Verification: wallet now has {actual_count} descriptor(s)")
                if actual_count == 0 and success_count > 0:
                    logger.error(
                        f"CRITICAL: Import reported {success_count} successes but wallet has "
                        f"0 descriptors! This may indicate a Bitcoin Core bug or wallet issue."
                    )
            except Exception as e:
                logger.warning(f"Could not verify descriptor import: {e}")

            self._descriptors_imported = True

            # Trigger background full rescan if needed
            background_rescan_started = False
            if background_rescan_needed and success_count > 0:
                try:
                    await self.start_background_rescan()
                    background_rescan_started = True
                except Exception as e:
                    logger.warning(f"Failed to start background rescan: {e}")

            return {
                "success_count": success_count,
                "error_count": error_count,
                "results": result,
                "background_rescan_started": background_rescan_started,
            }

        except Exception as e:
            logger.error(f"Failed to import descriptors: {e}")
            raise

    async def _add_descriptor_checksum(self, descriptor: str) -> str:
        """Add checksum to descriptor if not present."""
        if "#" in descriptor:
            return descriptor  # Already has checksum

        try:
            result = await self._rpc_call("getdescriptorinfo", [descriptor], use_wallet=False)
            return result.get("descriptor", descriptor)
        except Exception as e:
            logger.warning(f"Failed to get descriptor checksum: {e}")
            return descriptor

    async def start_background_rescan(self, start_height: int = 0) -> None:
        """
        Start a background blockchain rescan from the given height.

        This triggers a rescan that runs asynchronously in Bitcoin Core.
        The rescan will find any transactions that were missed by the
        initial smart scan (which only scans recent blocks).

        Unlike the synchronous rescan in import_descriptors, this method
        returns immediately and the rescan continues in the background.

        Args:
            start_height: Block height to start rescan from (default: 0 = genesis)
        """
        if not self._wallet_loaded:
            raise RuntimeError("Wallet not loaded. Call create_wallet() first.")

        try:
            logger.info(
                f"Starting background blockchain rescan from height {start_height}. "
                "This will run in the background and may take several minutes on mainnet."
            )

            # rescanblockchain runs in the background when called via RPC
            # We use a fire-and-forget approach with a short timeout client
            # to avoid blocking on the full rescan
            import asyncio

            # Create a task that won't block the caller
            # We don't await it - let it run in background
            asyncio.create_task(self._run_background_rescan(start_height))

            self._background_rescan_height = start_height

        except Exception as e:
            logger.error(f"Failed to start background rescan: {e}")
            raise

    async def _run_background_rescan(self, start_height: int) -> None:
        """
        Internal method to run the background rescan.

        This is executed as a fire-and-forget task.
        """
        try:
            # Use a client with very long timeout for the background rescan
            # 2 hours should be enough for a full mainnet rescan
            background_client = httpx.AsyncClient(
                timeout=7200.0,  # 2 hours
                auth=(self.rpc_user, self.rpc_password),
            )
            try:
                result = await self._rpc_call(
                    "rescanblockchain",
                    [start_height],
                    client=background_client,
                )
                start_h = result.get("start_height", start_height)
                stop_h = result.get("stop_height", "?")
                logger.info(f"Background rescan completed: scanned blocks {start_h} to {stop_h}")
            finally:
                await background_client.aclose()

            self._background_rescan_height = None

        except asyncio.CancelledError:
            logger.info("Background rescan was cancelled")
            self._background_rescan_height = None
        except Exception as e:
            logger.error(f"Background rescan failed: {e}")
            self._background_rescan_height = None

    async def get_rescan_status(self) -> dict[str, Any] | None:
        """
        Check the status of any ongoing wallet rescan.

        Returns:
            Dict with rescan progress info, or None if no rescan in progress.
            Example: {"progress": 0.5, "current_height": 500000}
        """
        if not self._wallet_loaded:
            return None

        try:
            # getwalletinfo includes rescan progress if a rescan is in progress
            wallet_info = await self._rpc_call("getwalletinfo")

            if "scanning" in wallet_info and wallet_info["scanning"]:
                scanning_info = wallet_info["scanning"]
                return {
                    "in_progress": True,
                    "progress": scanning_info.get("progress", 0),
                    "duration": scanning_info.get("duration", 0),
                }

            return {"in_progress": False}

        except Exception as e:
            logger.debug(f"Could not get rescan status: {e}")
            return None

    def is_background_rescan_pending(self) -> bool:
        """Check if a background rescan was started and may still be running."""
        return self._background_rescan_height is not None

    async def wait_for_rescan_complete(
        self,
        poll_interval: float = 5.0,
        timeout: float | None = None,
        progress_callback: Callable[[float], None] | None = None,
    ) -> bool:
        """
        Wait for any ongoing wallet rescan to complete.

        This is useful after importing descriptors with rescan=True to ensure
        the wallet is fully synced before querying UTXOs.

        Args:
            poll_interval: How often to check rescan status (seconds)
            timeout: Maximum time to wait (seconds). None = wait indefinitely.
            progress_callback: Optional callback(progress) called with progress 0.0-1.0

        Returns:
            True if rescan completed, False if timed out
        """
        import time

        start_time = time.time()

        while True:
            status = await self.get_rescan_status()

            if status is None or not status.get("in_progress", False):
                # No rescan in progress, we're done
                return True

            progress = status.get("progress", 0)
            if progress_callback:
                progress_callback(progress)

            logger.debug(f"Rescan in progress: {progress:.1%}")

            if timeout is not None and (time.time() - start_time) > timeout:
                logger.warning(f"Rescan wait timed out after {timeout}s")
                return False

            await asyncio.sleep(poll_interval)

    async def setup_wallet(
        self,
        descriptors: Sequence[str | dict[str, Any]],
        rescan: bool = True,
        smart_scan: bool = True,
        background_full_rescan: bool = True,
    ) -> bool:
        """
        Complete wallet setup: create wallet and import descriptors.

        This is a convenience method for initial setup. By default, uses smart scan
        for fast startup with a background full rescan.

        Args:
            descriptors: Descriptors to import
            rescan: Whether to rescan blockchain
            smart_scan: If True and rescan=True, scan from ~1 year ago (fast startup)
            background_full_rescan: If True and smart_scan=True, run full rescan in background

        Returns:
            True if setup completed successfully
        """
        await self.create_wallet(disable_private_keys=True)
        await self.import_descriptors(
            descriptors,
            rescan=rescan,
            smart_scan=smart_scan,
            background_full_rescan=background_full_rescan,
        )
        return True

    async def list_descriptors(self) -> list[dict[str, Any]]:
        """
        List all descriptors currently imported in the wallet.

        Returns:
            List of descriptor info dicts with fields like 'desc', 'timestamp', 'active', etc.

        Example:
            descriptors = await backend.list_descriptors()
            for d in descriptors:
                print(f"Descriptor: {d['desc']}, Active: {d.get('active', False)}")
        """
        if not self._wallet_loaded:
            raise RuntimeError("Wallet not loaded. Call create_wallet() first.")

        try:
            result = await self._rpc_call("listdescriptors")
            return result.get("descriptors", [])
        except Exception as e:
            logger.error(f"Failed to list descriptors: {e}")
            raise

    async def is_wallet_setup(self, expected_descriptor_count: int | None = None) -> bool:
        """
        Check if wallet is already set up with imported descriptors.

        Args:
            expected_descriptor_count: If provided, verifies this many descriptors are imported.
                                      For JoinMarket: 2 per mixdepth (external + internal)
                                      Example: 5 mixdepths = 10 descriptors minimum

        Returns:
            True if wallet exists and has descriptors imported

        Example:
            # Check if wallet is set up for 5 mixdepths
            if await backend.is_wallet_setup(expected_descriptor_count=10):
                # Already set up, just sync
                utxos = await wallet.sync_with_descriptor_wallet()
            else:
                # First time - import descriptors
                await wallet.setup_descriptor_wallet(rescan=True)
        """
        try:
            # Check if wallet exists and is loaded
            wallets = await self._rpc_call("listwallets", use_wallet=False)
            if self.wallet_name in wallets:
                self._wallet_loaded = True
            else:
                # Try to load it
                try:
                    await self._rpc_call("loadwallet", [self.wallet_name], use_wallet=False)
                    self._wallet_loaded = True
                except ValueError:
                    return False

            # Check if descriptors are imported
            descriptors = await self.list_descriptors()
            if not descriptors:
                return False

            # If expected count provided, verify
            if expected_descriptor_count is not None:
                return len(descriptors) >= expected_descriptor_count

            return True

        except Exception as e:
            logger.debug(f"Wallet setup check failed: {e}")
            return False

    async def get_utxos(self, addresses: list[str]) -> list[UTXO]:
        """
        Get UTXOs for given addresses using listunspent.

        This is MUCH faster than scantxoutset because:
        1. Only queries wallet's tracked UTXOs (not entire UTXO set)
        2. Includes unconfirmed transactions from mempool
        3. O(wallet size) instead of O(UTXO set size)

        Args:
            addresses: List of addresses to filter by (empty = all wallet UTXOs)

        Returns:
            List of UTXOs
        """
        if not self._wallet_loaded:
            logger.warning("Wallet not loaded, returning empty UTXO list")
            return []

        try:
            # Get current block height for calculating UTXO height
            tip_height = await self.get_block_height()

            # listunspent params: minconf, maxconf, addresses, include_unsafe, query_options
            # minconf=0 includes unconfirmed, maxconf=9999999 includes all confirmed
            # NOTE: When addresses is empty, we must omit it entirely (not pass [])
            # because Bitcoin Core interprets [] as "filter to 0 addresses" = return nothing
            if addresses:
                # Filter to specific addresses
                result = await self._rpc_call(
                    "listunspent",
                    [
                        0,  # minconf - include unconfirmed
                        9999999,  # maxconf
                        addresses,  # filter addresses
                        True,  # include_unsafe (include unconfirmed from mempool)
                    ],
                )
            else:
                # Get all wallet UTXOs - omit addresses parameter
                result = await self._rpc_call(
                    "listunspent",
                    [
                        0,  # minconf - include unconfirmed
                        9999999,  # maxconf
                    ],
                )

            utxos = []
            for utxo_data in result:
                confirmations = utxo_data.get("confirmations", 0)
                height = None
                if confirmations > 0:
                    height = tip_height - confirmations + 1

                utxo = UTXO(
                    txid=utxo_data["txid"],
                    vout=utxo_data["vout"],
                    value=btc_to_sats(utxo_data["amount"]),
                    address=utxo_data.get("address", ""),
                    confirmations=confirmations,
                    scriptpubkey=utxo_data.get("scriptPubKey", ""),
                    height=height,
                )
                utxos.append(utxo)

            logger.debug(f"Found {len(utxos)} UTXOs via listunspent")
            return utxos

        except Exception as e:
            logger.error(f"Failed to get UTXOs via listunspent: {e}")
            return []

    async def get_all_utxos(self) -> list[UTXO]:
        """
        Get all UTXOs tracked by the wallet.

        Returns:
            List of all wallet UTXOs
        """
        return await self.get_utxos([])

    async def get_address_balance(self, address: str) -> int:
        """Get balance for an address in satoshis."""
        utxos = await self.get_utxos([address])
        return sum(utxo.value for utxo in utxos)

    async def get_wallet_balance(self) -> dict[str, int]:
        """
        Get total wallet balance including unconfirmed.

        Returns:
            Dict with 'confirmed', 'unconfirmed', 'total' balances in satoshis
        """
        try:
            result = await self._rpc_call("getbalances")
            mine = result.get("mine", {})
            confirmed = btc_to_sats(mine.get("trusted", 0))
            unconfirmed = btc_to_sats(mine.get("untrusted_pending", 0))
            return {
                "confirmed": confirmed,
                "unconfirmed": unconfirmed,
                "total": confirmed + unconfirmed,
            }
        except Exception as e:
            logger.error(f"Failed to get wallet balance: {e}")
            return {"confirmed": 0, "unconfirmed": 0, "total": 0}

    async def broadcast_transaction(self, tx_hex: str) -> str:
        """Broadcast transaction, returns txid."""
        try:
            txid = await self._rpc_call("sendrawtransaction", [tx_hex], use_wallet=False)
            logger.info(f"Broadcast transaction: {txid}")
            return txid
        except Exception as e:
            logger.error(f"Failed to broadcast transaction: {e}")
            raise ValueError(f"Broadcast failed: {e}") from e

    async def get_transaction(self, txid: str) -> Transaction | None:
        """Get transaction by txid."""
        try:
            # First try wallet transaction for extra info
            try:
                tx_data = await self._rpc_call("gettransaction", [txid, True])
                confirmations = tx_data.get("confirmations", 0)
                block_height = tx_data.get("blockheight")
                block_time = tx_data.get("blocktime")
                raw_hex = tx_data.get("hex", "")
            except ValueError:
                # Fall back to getrawtransaction if not in wallet
                tx_data = await self._rpc_call("getrawtransaction", [txid, True], use_wallet=False)
                if not tx_data:
                    return None
                confirmations = tx_data.get("confirmations", 0)
                block_height = None
                block_time = None
                if "blockhash" in tx_data:
                    block_info = await self._rpc_call(
                        "getblockheader", [tx_data["blockhash"]], use_wallet=False
                    )
                    block_height = block_info.get("height")
                    block_time = block_info.get("time")
                raw_hex = tx_data.get("hex", "")

            return Transaction(
                txid=txid,
                raw=raw_hex,
                confirmations=confirmations,
                block_height=block_height,
                block_time=block_time,
            )
        except Exception as e:
            logger.warning(f"Failed to get transaction {txid}: {e}")
            return None

    async def estimate_fee(self, target_blocks: int) -> float:
        """Estimate fee in sat/vbyte for target confirmation blocks."""
        try:
            result = await self._rpc_call("estimatesmartfee", [target_blocks], use_wallet=False)
            if "feerate" in result:
                btc_per_kb = result["feerate"]
                sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
                return sat_per_vbyte
            else:
                logger.warning("Fee estimation unavailable, using fallback")
                return 1.0
        except Exception as e:
            logger.warning(f"Failed to estimate fee: {e}, using fallback")
            return 1.0

    async def get_mempool_min_fee(self) -> float | None:
        """Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool."""
        try:
            result = await self._rpc_call("getmempoolinfo", use_wallet=False)
            if "mempoolminfee" in result:
                btc_per_kb = result["mempoolminfee"]
                sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
                logger.debug(f"Mempool min fee: {sat_per_vbyte} sat/vB")
                return sat_per_vbyte
            return None
        except Exception as e:
            logger.debug(f"Failed to get mempool min fee: {e}")
            return None

    async def get_block_height(self) -> int:
        """Get current blockchain height."""
        info = await self._rpc_call("getblockchaininfo", use_wallet=False)
        return info.get("blocks", 0)

    async def get_block_time(self, block_height: int) -> int:
        """Get block time (unix timestamp) for given height."""
        block_hash = await self.get_block_hash(block_height)
        block_header = await self._rpc_call("getblockheader", [block_hash], use_wallet=False)
        return block_header.get("time", 0)

    async def get_block_hash(self, block_height: int) -> str:
        """Get block hash for given height."""
        return await self._rpc_call("getblockhash", [block_height], use_wallet=False)

    async def get_utxo(self, txid: str, vout: int) -> UTXO | None:
        """
        Get a specific UTXO.

        First checks wallet's UTXOs, then falls back to gettxout for non-wallet UTXOs.
        """
        # First check wallet UTXOs (fast)
        try:
            utxos = await self._rpc_call(
                "listunspent",
                [0, 9999999, [], True, {"minimumAmount": 0}],
            )
            for utxo_data in utxos:
                if utxo_data["txid"] == txid and utxo_data["vout"] == vout:
                    return UTXO(
                        txid=utxo_data["txid"],
                        vout=utxo_data["vout"],
                        value=btc_to_sats(utxo_data["amount"]),
                        address=utxo_data.get("address", ""),
                        confirmations=utxo_data.get("confirmations", 0),
                        scriptpubkey=utxo_data.get("scriptPubKey", ""),
                        height=None,
                    )
        except Exception as e:
            logger.debug(f"Wallet UTXO lookup failed: {e}")

        # Fall back to gettxout for non-wallet UTXOs
        try:
            result = await self._rpc_call("gettxout", [txid, vout, True], use_wallet=False)
            if result is None:
                return None

            tip_height = await self.get_block_height()
            confirmations = result.get("confirmations", 0)
            height = tip_height - confirmations + 1 if confirmations > 0 else None

            script_pub_key = result.get("scriptPubKey", {})
            return UTXO(
                txid=txid,
                vout=vout,
                value=btc_to_sats(result.get("value", 0)),
                address=script_pub_key.get("address", ""),
                confirmations=confirmations,
                scriptpubkey=script_pub_key.get("hex", ""),
                height=height,
            )
        except Exception as e:
            logger.error(f"Failed to get UTXO {txid}:{vout}: {e}")
            return None

    async def rescan_blockchain(self, start_height: int = 0) -> dict[str, Any]:
        """
        Rescan blockchain from given height.

        Useful after importing new descriptors or recovering wallet.

        Args:
            start_height: Block height to start rescan from

        Returns:
            Rescan result
        """
        try:
            logger.info(f"Starting blockchain rescan from height {start_height}...")
            result = await self._rpc_call(
                "rescanblockchain",
                [start_height],
                client=self._import_client,  # Use longer timeout
            )
            logger.info(f"Rescan complete: {result}")
            return result
        except Exception as e:
            logger.error(f"Rescan failed: {e}")
            raise

    async def get_new_address(self, address_type: str = "bech32") -> str:
        """
        Get a new address from the wallet.

        Note: This only works if private keys are enabled in the wallet.
        For watch-only wallets, derive addresses from the descriptors instead.
        """
        try:
            return await self._rpc_call("getnewaddress", ["", address_type])
        except ValueError as e:
            if "private keys disabled" in str(e).lower():
                raise RuntimeError(
                    "Cannot generate new addresses in watch-only wallet. "
                    "Derive addresses from your descriptors instead."
                ) from e
            raise

    async def get_addresses_with_history(self) -> set[str]:
        """
        Get all addresses that have ever been involved in transactions.

        Uses listaddressgroupings as the primary source, which returns addresses
        that have been used as inputs or outputs in any transaction. This is more
        reliable than listsinceblock for descriptor wallets because it captures
        address usage even when transaction details aren't fully recorded.

        Falls back to listsinceblock as a secondary source to catch any addresses
        that might only appear in transaction history.

        This is critical for tracking address usage to prevent reuse - a key
        privacy concern for CoinJoin wallets.

        Returns:
            Set of addresses that have ever been used in transactions
        """
        addresses: set[str] = set()

        # Primary source: listaddressgroupings
        # This returns addresses grouped by common ownership (used together in txs)
        # It reliably shows addresses that have been used, even if the transaction
        # details aren't available in listsinceblock (e.g., after wallet import)
        try:
            groupings = await self._rpc_call("listaddressgroupings", [])
            for group in groupings:
                for entry in group:
                    # Each entry is [address, balance, label?]
                    if entry and len(entry) >= 1:
                        addresses.add(entry[0])
            logger.debug(f"Found {len(addresses)} addresses from listaddressgroupings")
        except Exception as e:
            logger.warning(f"Failed to get addresses from listaddressgroupings: {e}")

        # Secondary source: listsinceblock
        # This catches addresses that might only appear in transaction history
        # but weren't grouped (e.g., single-use receive addresses)
        try:
            # listsinceblock params: blockhash (empty = all), target_confirmations,
            #                        include_watchonly, include_removed
            result = await self._rpc_call("listsinceblock", ["", 1, True, False])

            for tx in result.get("transactions", []):
                # Only include "receive" and "generate" categories - these are addresses
                # where this wallet received funds (our own addresses).
                # "send" category includes counterparty addresses we sent TO.
                if "address" in tx and tx.get("category") in ("receive", "generate"):
                    addresses.add(tx["address"])
        except Exception as e:
            logger.warning(f"Failed to get addresses from listsinceblock: {e}")

        logger.debug(f"Total addresses with history: {len(addresses)}")
        return addresses

    async def get_descriptor_ranges(self) -> dict[str, tuple[int, int]]:
        """
        Get the current range for each imported descriptor.

        Returns:
            Dictionary mapping descriptor base (without checksum) to (start, end) range.
            For non-ranged descriptors (addr(...)), returns empty range.

        Example:
            ranges = await backend.get_descriptor_ranges()
            # {"wpkh(xpub.../0/*)": (0, 999), "wpkh(xpub.../1/*)": (0, 999)}
        """
        if not self._wallet_loaded:
            return {}

        try:
            result = await self._rpc_call("listdescriptors")
            ranges: dict[str, tuple[int, int]] = {}

            for desc_info in result.get("descriptors", []):
                desc = desc_info.get("desc", "")
                # Remove checksum for cleaner key
                desc_base = desc.split("#")[0] if "#" in desc else desc

                # Get range - may be [start, end] or just end for simple ranges
                range_info = desc_info.get("range")
                if range_info is not None:
                    if isinstance(range_info, list) and len(range_info) >= 2:
                        ranges[desc_base] = (range_info[0], range_info[1])
                    elif isinstance(range_info, int):
                        ranges[desc_base] = (0, range_info)

            return ranges
        except Exception as e:
            logger.warning(f"Failed to get descriptor ranges: {e}")
            return {}

    async def get_max_descriptor_range(self) -> int:
        """
        Get the maximum range end across all imported descriptors.

        Returns:
            Maximum end index, or DEFAULT_GAP_LIMIT if no descriptors found.
        """
        ranges = await self.get_descriptor_ranges()
        if not ranges:
            return DEFAULT_GAP_LIMIT

        max_end = 0
        for start, end in ranges.values():
            if end > max_end:
                max_end = end

        return max_end if max_end > 0 else DEFAULT_GAP_LIMIT

    async def upgrade_descriptor_ranges(
        self,
        descriptors: Sequence[str | dict[str, Any]],
        new_range_end: int,
        rescan: bool = False,
    ) -> dict[str, Any]:
        """
        Upgrade descriptor ranges to track more addresses.

        This re-imports existing descriptors with a larger range. Bitcoin Core
        will automatically track the new addresses without re-scanning the entire
        blockchain (unless rescan=True is specified).

        This is useful when a wallet has grown beyond the initially imported range.
        For example, if originally imported with range [0, 999] and now need to
        track addresses up to index 5000.

        Args:
            descriptors: List of descriptors to upgrade (same format as import_descriptors)
            new_range_end: New end index for the range (e.g., 5000 for [0, 5000])
            rescan: Whether to rescan blockchain for the new addresses.
                   Usually not needed if wallet was already tracking some range.

        Returns:
            Import result from Bitcoin Core

        Note:
            Re-importing with a larger range is safe - Bitcoin Core will extend
            the tracking without duplicating or losing existing data.
        """
        if not self._wallet_loaded:
            raise RuntimeError("Wallet not loaded. Call create_wallet() first.")

        # Update ranges in descriptor dicts
        updated_descriptors = []
        for desc in descriptors:
            if isinstance(desc, str):
                # String descriptor - add range
                updated_descriptors.append(
                    {
                        "desc": desc,
                        "range": [0, new_range_end],
                    }
                )
            elif isinstance(desc, dict):
                # Dict descriptor - update range
                updated = dict(desc)
                if "*" in updated.get("desc", ""):  # Only ranged descriptors
                    updated["range"] = [0, new_range_end]
                updated_descriptors.append(updated)

        logger.info(
            f"Upgrading {len(updated_descriptors)} descriptor(s) to range [0, {new_range_end}]"
        )

        # Re-import with new range
        # timestamp="now" means don't rescan unless explicitly requested
        return await self.import_descriptors(
            updated_descriptors,
            rescan=rescan,
            timestamp=0 if rescan else "now",
            smart_scan=False,  # Don't use smart scan for upgrades
            background_full_rescan=False,
        )

    async def unload_wallet(self) -> None:
        """Unload the wallet from Bitcoin Core."""
        if self._wallet_loaded:
            try:
                await self._rpc_call("unloadwallet", [self.wallet_name], use_wallet=False)
                logger.info(f"Unloaded wallet '{self.wallet_name}'")
                self._wallet_loaded = False
            except Exception as e:
                logger.warning(f"Failed to unload wallet: {e}")

    def can_provide_neutrino_metadata(self) -> bool:
        """Bitcoin Core can provide Neutrino-compatible metadata."""
        return True

    async def close(self) -> None:
        """Close backend connections."""
        await self.client.aclose()
        await self._import_client.aclose()

Blockchain backend using Bitcoin Core descriptor wallets.

This backend creates and manages a descriptor wallet in Bitcoin Core, importing xpub descriptors for efficient UTXO tracking. Once imported, Bitcoin Core automatically tracks UTXOs and provides fast queries via listunspent.

Usage

backend = DescriptorWalletBackend( rpc_url="http://127.0.0.1:8332", rpc_user="user", rpc_password="pass", wallet_name="jm_wallet", )

Setup wallet and import descriptors (one-time or on startup)

await backend.setup_wallet(descriptors)

Fast UTXO queries - no more full UTXO set scans

utxos = await backend.get_utxos(addresses)

Initialize descriptor wallet backend.

Args

rpc_url
Bitcoin Core RPC URL
rpc_user
RPC username
rpc_password
RPC password
wallet_name
Name for the descriptor wallet in Bitcoin Core
import_timeout
Timeout for descriptor import operations

Ancestors

Methods

async def broadcast_transaction(self, tx_hex: str) ‑> str
Expand source code
async def broadcast_transaction(self, tx_hex: str) -> str:
    """Broadcast transaction, returns txid."""
    try:
        txid = await self._rpc_call("sendrawtransaction", [tx_hex], use_wallet=False)
        logger.info(f"Broadcast transaction: {txid}")
        return txid
    except Exception as e:
        logger.error(f"Failed to broadcast transaction: {e}")
        raise ValueError(f"Broadcast failed: {e}") from e

Broadcast transaction, returns txid.

def can_provide_neutrino_metadata(self) ‑> bool
Expand source code
def can_provide_neutrino_metadata(self) -> bool:
    """Bitcoin Core can provide Neutrino-compatible metadata."""
    return True

Bitcoin Core can provide Neutrino-compatible metadata.

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close backend connections."""
    await self.client.aclose()
    await self._import_client.aclose()

Close backend connections.

async def create_wallet(self, disable_private_keys: bool = True) ‑> bool
Expand source code
async def create_wallet(self, disable_private_keys: bool = True) -> bool:
    """
    Create a descriptor wallet in Bitcoin Core.

    The wallet is encrypted with the passphrase (if provided) to protect
    the xpubs from unauthorized access. This is important because xpubs
    reveal transaction history, which would undo the privacy benefits
    of CoinJoin if exposed.

    Args:
        disable_private_keys: If True, creates a watch-only wallet (recommended)

    Returns:
        True if wallet was created or already exists
    """
    try:
        # First check if wallet already exists
        wallets = await self._rpc_call("listwallets", use_wallet=False)
        if self.wallet_name in wallets:
            logger.info(f"Wallet '{self.wallet_name}' already loaded")
            self._wallet_loaded = True
            return True

        # Try to load existing wallet
        try:
            await self._rpc_call("loadwallet", [self.wallet_name], use_wallet=False)
            logger.info(f"Loaded existing wallet '{self.wallet_name}'")
            self._wallet_loaded = True
            return True
        except ValueError as e:
            error_str = str(e).lower()
            # RPC error -18 is "Wallet not found" or "Path does not exist"
            not_found_errs = ("not found", "does not exist", "-18")
            if not any(err in error_str for err in not_found_errs):
                raise

        # Create new descriptor wallet (watch-only, no private keys)
        # Params: wallet_name, disable_private_keys, blank, passphrase, avoid_reuse, descriptors
        result = await self._rpc_call(
            "createwallet",
            [
                self.wallet_name,  # wallet_name
                disable_private_keys,  # disable_private_keys
                True,  # blank (no default keys)
                "",  # passphrase (empty - not supported for watch-only wallets)
                False,  # avoid_reuse
                True,  # descriptors (MUST be True for descriptor wallet)
            ],
            use_wallet=False,
        )

        logger.info(f"Created descriptor wallet '{self.wallet_name}': {result}")
        self._wallet_loaded = True
        return True

    except Exception as e:
        logger.error(f"Failed to create/load wallet: {e}")
        raise

Create a descriptor wallet in Bitcoin Core.

The wallet is encrypted with the passphrase (if provided) to protect the xpubs from unauthorized access. This is important because xpubs reveal transaction history, which would undo the privacy benefits of CoinJoin if exposed.

Args

disable_private_keys
If True, creates a watch-only wallet (recommended)

Returns

True if wallet was created or already exists

async def estimate_fee(self, target_blocks: int) ‑> float
Expand source code
async def estimate_fee(self, target_blocks: int) -> float:
    """Estimate fee in sat/vbyte for target confirmation blocks."""
    try:
        result = await self._rpc_call("estimatesmartfee", [target_blocks], use_wallet=False)
        if "feerate" in result:
            btc_per_kb = result["feerate"]
            sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
            return sat_per_vbyte
        else:
            logger.warning("Fee estimation unavailable, using fallback")
            return 1.0
    except Exception as e:
        logger.warning(f"Failed to estimate fee: {e}, using fallback")
        return 1.0

Estimate fee in sat/vbyte for target confirmation blocks.

async def get_address_balance(self, address: str) ‑> int
Expand source code
async def get_address_balance(self, address: str) -> int:
    """Get balance for an address in satoshis."""
    utxos = await self.get_utxos([address])
    return sum(utxo.value for utxo in utxos)

Get balance for an address in satoshis.

async def get_addresses_with_history(self) ‑> set[str]
Expand source code
async def get_addresses_with_history(self) -> set[str]:
    """
    Get all addresses that have ever been involved in transactions.

    Uses listaddressgroupings as the primary source, which returns addresses
    that have been used as inputs or outputs in any transaction. This is more
    reliable than listsinceblock for descriptor wallets because it captures
    address usage even when transaction details aren't fully recorded.

    Falls back to listsinceblock as a secondary source to catch any addresses
    that might only appear in transaction history.

    This is critical for tracking address usage to prevent reuse - a key
    privacy concern for CoinJoin wallets.

    Returns:
        Set of addresses that have ever been used in transactions
    """
    addresses: set[str] = set()

    # Primary source: listaddressgroupings
    # This returns addresses grouped by common ownership (used together in txs)
    # It reliably shows addresses that have been used, even if the transaction
    # details aren't available in listsinceblock (e.g., after wallet import)
    try:
        groupings = await self._rpc_call("listaddressgroupings", [])
        for group in groupings:
            for entry in group:
                # Each entry is [address, balance, label?]
                if entry and len(entry) >= 1:
                    addresses.add(entry[0])
        logger.debug(f"Found {len(addresses)} addresses from listaddressgroupings")
    except Exception as e:
        logger.warning(f"Failed to get addresses from listaddressgroupings: {e}")

    # Secondary source: listsinceblock
    # This catches addresses that might only appear in transaction history
    # but weren't grouped (e.g., single-use receive addresses)
    try:
        # listsinceblock params: blockhash (empty = all), target_confirmations,
        #                        include_watchonly, include_removed
        result = await self._rpc_call("listsinceblock", ["", 1, True, False])

        for tx in result.get("transactions", []):
            # Only include "receive" and "generate" categories - these are addresses
            # where this wallet received funds (our own addresses).
            # "send" category includes counterparty addresses we sent TO.
            if "address" in tx and tx.get("category") in ("receive", "generate"):
                addresses.add(tx["address"])
    except Exception as e:
        logger.warning(f"Failed to get addresses from listsinceblock: {e}")

    logger.debug(f"Total addresses with history: {len(addresses)}")
    return addresses

Get all addresses that have ever been involved in transactions.

Uses listaddressgroupings as the primary source, which returns addresses that have been used as inputs or outputs in any transaction. This is more reliable than listsinceblock for descriptor wallets because it captures address usage even when transaction details aren't fully recorded.

Falls back to listsinceblock as a secondary source to catch any addresses that might only appear in transaction history.

This is critical for tracking address usage to prevent reuse - a key privacy concern for CoinJoin wallets.

Returns

Set of addresses that have ever been used in transactions

async def get_all_utxos(self) ‑> list[UTXO]
Expand source code
async def get_all_utxos(self) -> list[UTXO]:
    """
    Get all UTXOs tracked by the wallet.

    Returns:
        List of all wallet UTXOs
    """
    return await self.get_utxos([])

Get all UTXOs tracked by the wallet.

Returns

List of all wallet UTXOs

async def get_block_hash(self, block_height: int) ‑> str
Expand source code
async def get_block_hash(self, block_height: int) -> str:
    """Get block hash for given height."""
    return await self._rpc_call("getblockhash", [block_height], use_wallet=False)

Get block hash for given height.

async def get_block_height(self) ‑> int
Expand source code
async def get_block_height(self) -> int:
    """Get current blockchain height."""
    info = await self._rpc_call("getblockchaininfo", use_wallet=False)
    return info.get("blocks", 0)

Get current blockchain height.

async def get_block_time(self, block_height: int) ‑> int
Expand source code
async def get_block_time(self, block_height: int) -> int:
    """Get block time (unix timestamp) for given height."""
    block_hash = await self.get_block_hash(block_height)
    block_header = await self._rpc_call("getblockheader", [block_hash], use_wallet=False)
    return block_header.get("time", 0)

Get block time (unix timestamp) for given height.

async def get_descriptor_ranges(self) ‑> dict[str, tuple[int, int]]
Expand source code
async def get_descriptor_ranges(self) -> dict[str, tuple[int, int]]:
    """
    Get the current range for each imported descriptor.

    Returns:
        Dictionary mapping descriptor base (without checksum) to (start, end) range.
        For non-ranged descriptors (addr(...)), returns empty range.

    Example:
        ranges = await backend.get_descriptor_ranges()
        # {"wpkh(xpub.../0/*)": (0, 999), "wpkh(xpub.../1/*)": (0, 999)}
    """
    if not self._wallet_loaded:
        return {}

    try:
        result = await self._rpc_call("listdescriptors")
        ranges: dict[str, tuple[int, int]] = {}

        for desc_info in result.get("descriptors", []):
            desc = desc_info.get("desc", "")
            # Remove checksum for cleaner key
            desc_base = desc.split("#")[0] if "#" in desc else desc

            # Get range - may be [start, end] or just end for simple ranges
            range_info = desc_info.get("range")
            if range_info is not None:
                if isinstance(range_info, list) and len(range_info) >= 2:
                    ranges[desc_base] = (range_info[0], range_info[1])
                elif isinstance(range_info, int):
                    ranges[desc_base] = (0, range_info)

        return ranges
    except Exception as e:
        logger.warning(f"Failed to get descriptor ranges: {e}")
        return {}

Get the current range for each imported descriptor.

Returns

Dictionary mapping descriptor base (without checksum) to (start, end) range. For non-ranged descriptors (addr(…)), returns empty range.

Example

ranges = await backend.get_descriptor_ranges()

{"wpkh(xpub…/0/)": (0, 999), "wpkh(xpub…/1/)": (0, 999)}

async def get_max_descriptor_range(self) ‑> int
Expand source code
async def get_max_descriptor_range(self) -> int:
    """
    Get the maximum range end across all imported descriptors.

    Returns:
        Maximum end index, or DEFAULT_GAP_LIMIT if no descriptors found.
    """
    ranges = await self.get_descriptor_ranges()
    if not ranges:
        return DEFAULT_GAP_LIMIT

    max_end = 0
    for start, end in ranges.values():
        if end > max_end:
            max_end = end

    return max_end if max_end > 0 else DEFAULT_GAP_LIMIT

Get the maximum range end across all imported descriptors.

Returns

Maximum end index, or DEFAULT_GAP_LIMIT if no descriptors found.

async def get_mempool_min_fee(self) ‑> float | None
Expand source code
async def get_mempool_min_fee(self) -> float | None:
    """Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool."""
    try:
        result = await self._rpc_call("getmempoolinfo", use_wallet=False)
        if "mempoolminfee" in result:
            btc_per_kb = result["mempoolminfee"]
            sat_per_vbyte = btc_to_sats(btc_per_kb) / 1000
            logger.debug(f"Mempool min fee: {sat_per_vbyte} sat/vB")
            return sat_per_vbyte
        return None
    except Exception as e:
        logger.debug(f"Failed to get mempool min fee: {e}")
        return None

Get the minimum fee rate (in sat/vB) for transaction to be accepted into mempool.

async def get_new_address(self, address_type: str = 'bech32') ‑> str
Expand source code
async def get_new_address(self, address_type: str = "bech32") -> str:
    """
    Get a new address from the wallet.

    Note: This only works if private keys are enabled in the wallet.
    For watch-only wallets, derive addresses from the descriptors instead.
    """
    try:
        return await self._rpc_call("getnewaddress", ["", address_type])
    except ValueError as e:
        if "private keys disabled" in str(e).lower():
            raise RuntimeError(
                "Cannot generate new addresses in watch-only wallet. "
                "Derive addresses from your descriptors instead."
            ) from e
        raise

Get a new address from the wallet.

Note: This only works if private keys are enabled in the wallet. For watch-only wallets, derive addresses from the descriptors instead.

async def get_rescan_status(self) ‑> dict[str, typing.Any] | None
Expand source code
async def get_rescan_status(self) -> dict[str, Any] | None:
    """
    Check the status of any ongoing wallet rescan.

    Returns:
        Dict with rescan progress info, or None if no rescan in progress.
        Example: {"progress": 0.5, "current_height": 500000}
    """
    if not self._wallet_loaded:
        return None

    try:
        # getwalletinfo includes rescan progress if a rescan is in progress
        wallet_info = await self._rpc_call("getwalletinfo")

        if "scanning" in wallet_info and wallet_info["scanning"]:
            scanning_info = wallet_info["scanning"]
            return {
                "in_progress": True,
                "progress": scanning_info.get("progress", 0),
                "duration": scanning_info.get("duration", 0),
            }

        return {"in_progress": False}

    except Exception as e:
        logger.debug(f"Could not get rescan status: {e}")
        return None

Check the status of any ongoing wallet rescan.

Returns

Dict with rescan progress info, or None if no rescan in progress.
Example
{"progress": 0.5, "current_height": 500000}
async def get_transaction(self, txid: str) ‑> Transaction | None
Expand source code
async def get_transaction(self, txid: str) -> Transaction | None:
    """Get transaction by txid."""
    try:
        # First try wallet transaction for extra info
        try:
            tx_data = await self._rpc_call("gettransaction", [txid, True])
            confirmations = tx_data.get("confirmations", 0)
            block_height = tx_data.get("blockheight")
            block_time = tx_data.get("blocktime")
            raw_hex = tx_data.get("hex", "")
        except ValueError:
            # Fall back to getrawtransaction if not in wallet
            tx_data = await self._rpc_call("getrawtransaction", [txid, True], use_wallet=False)
            if not tx_data:
                return None
            confirmations = tx_data.get("confirmations", 0)
            block_height = None
            block_time = None
            if "blockhash" in tx_data:
                block_info = await self._rpc_call(
                    "getblockheader", [tx_data["blockhash"]], use_wallet=False
                )
                block_height = block_info.get("height")
                block_time = block_info.get("time")
            raw_hex = tx_data.get("hex", "")

        return Transaction(
            txid=txid,
            raw=raw_hex,
            confirmations=confirmations,
            block_height=block_height,
            block_time=block_time,
        )
    except Exception as e:
        logger.warning(f"Failed to get transaction {txid}: {e}")
        return None

Get transaction by txid.

async def get_utxo(self, txid: str, vout: int) ‑> UTXO | None
Expand source code
async def get_utxo(self, txid: str, vout: int) -> UTXO | None:
    """
    Get a specific UTXO.

    First checks wallet's UTXOs, then falls back to gettxout for non-wallet UTXOs.
    """
    # First check wallet UTXOs (fast)
    try:
        utxos = await self._rpc_call(
            "listunspent",
            [0, 9999999, [], True, {"minimumAmount": 0}],
        )
        for utxo_data in utxos:
            if utxo_data["txid"] == txid and utxo_data["vout"] == vout:
                return UTXO(
                    txid=utxo_data["txid"],
                    vout=utxo_data["vout"],
                    value=btc_to_sats(utxo_data["amount"]),
                    address=utxo_data.get("address", ""),
                    confirmations=utxo_data.get("confirmations", 0),
                    scriptpubkey=utxo_data.get("scriptPubKey", ""),
                    height=None,
                )
    except Exception as e:
        logger.debug(f"Wallet UTXO lookup failed: {e}")

    # Fall back to gettxout for non-wallet UTXOs
    try:
        result = await self._rpc_call("gettxout", [txid, vout, True], use_wallet=False)
        if result is None:
            return None

        tip_height = await self.get_block_height()
        confirmations = result.get("confirmations", 0)
        height = tip_height - confirmations + 1 if confirmations > 0 else None

        script_pub_key = result.get("scriptPubKey", {})
        return UTXO(
            txid=txid,
            vout=vout,
            value=btc_to_sats(result.get("value", 0)),
            address=script_pub_key.get("address", ""),
            confirmations=confirmations,
            scriptpubkey=script_pub_key.get("hex", ""),
            height=height,
        )
    except Exception as e:
        logger.error(f"Failed to get UTXO {txid}:{vout}: {e}")
        return None

Get a specific UTXO.

First checks wallet's UTXOs, then falls back to gettxout for non-wallet UTXOs.

async def get_utxos(self, addresses: list[str]) ‑> list[UTXO]
Expand source code
async def get_utxos(self, addresses: list[str]) -> list[UTXO]:
    """
    Get UTXOs for given addresses using listunspent.

    This is MUCH faster than scantxoutset because:
    1. Only queries wallet's tracked UTXOs (not entire UTXO set)
    2. Includes unconfirmed transactions from mempool
    3. O(wallet size) instead of O(UTXO set size)

    Args:
        addresses: List of addresses to filter by (empty = all wallet UTXOs)

    Returns:
        List of UTXOs
    """
    if not self._wallet_loaded:
        logger.warning("Wallet not loaded, returning empty UTXO list")
        return []

    try:
        # Get current block height for calculating UTXO height
        tip_height = await self.get_block_height()

        # listunspent params: minconf, maxconf, addresses, include_unsafe, query_options
        # minconf=0 includes unconfirmed, maxconf=9999999 includes all confirmed
        # NOTE: When addresses is empty, we must omit it entirely (not pass [])
        # because Bitcoin Core interprets [] as "filter to 0 addresses" = return nothing
        if addresses:
            # Filter to specific addresses
            result = await self._rpc_call(
                "listunspent",
                [
                    0,  # minconf - include unconfirmed
                    9999999,  # maxconf
                    addresses,  # filter addresses
                    True,  # include_unsafe (include unconfirmed from mempool)
                ],
            )
        else:
            # Get all wallet UTXOs - omit addresses parameter
            result = await self._rpc_call(
                "listunspent",
                [
                    0,  # minconf - include unconfirmed
                    9999999,  # maxconf
                ],
            )

        utxos = []
        for utxo_data in result:
            confirmations = utxo_data.get("confirmations", 0)
            height = None
            if confirmations > 0:
                height = tip_height - confirmations + 1

            utxo = UTXO(
                txid=utxo_data["txid"],
                vout=utxo_data["vout"],
                value=btc_to_sats(utxo_data["amount"]),
                address=utxo_data.get("address", ""),
                confirmations=confirmations,
                scriptpubkey=utxo_data.get("scriptPubKey", ""),
                height=height,
            )
            utxos.append(utxo)

        logger.debug(f"Found {len(utxos)} UTXOs via listunspent")
        return utxos

    except Exception as e:
        logger.error(f"Failed to get UTXOs via listunspent: {e}")
        return []

Get UTXOs for given addresses using listunspent.

This is MUCH faster than scantxoutset because: 1. Only queries wallet's tracked UTXOs (not entire UTXO set) 2. Includes unconfirmed transactions from mempool 3. O(wallet size) instead of O(UTXO set size)

Args

addresses
List of addresses to filter by (empty = all wallet UTXOs)

Returns

List of UTXOs

async def get_wallet_balance(self) ‑> dict[str, int]
Expand source code
async def get_wallet_balance(self) -> dict[str, int]:
    """
    Get total wallet balance including unconfirmed.

    Returns:
        Dict with 'confirmed', 'unconfirmed', 'total' balances in satoshis
    """
    try:
        result = await self._rpc_call("getbalances")
        mine = result.get("mine", {})
        confirmed = btc_to_sats(mine.get("trusted", 0))
        unconfirmed = btc_to_sats(mine.get("untrusted_pending", 0))
        return {
            "confirmed": confirmed,
            "unconfirmed": unconfirmed,
            "total": confirmed + unconfirmed,
        }
    except Exception as e:
        logger.error(f"Failed to get wallet balance: {e}")
        return {"confirmed": 0, "unconfirmed": 0, "total": 0}

Get total wallet balance including unconfirmed.

Returns

Dict with 'confirmed', 'unconfirmed', 'total' balances in satoshis

async def import_descriptors(self,
descriptors: Sequence[str | dict[str, Any]],
rescan: bool = True,
timestamp: str | int | None = None,
smart_scan: bool = True,
background_full_rescan: bool = True) ‑> dict[str, typing.Any]
Expand source code
async def import_descriptors(
    self,
    descriptors: Sequence[str | dict[str, Any]],
    rescan: bool = True,
    timestamp: str | int | None = None,
    smart_scan: bool = True,
    background_full_rescan: bool = True,
) -> dict[str, Any]:
    """
    Import descriptors into the wallet.

    This is the key operation that enables efficient UTXO tracking. Once imported,
    Bitcoin Core will automatically track all addresses derived from these descriptors.

    Smart Scan Behavior (smart_scan=True):
        Instead of scanning from genesis (which can take 20+ minutes on mainnet),
        the smart scan imports descriptors with a timestamp ~1 year in the past.
        This allows quick startup while still catching most wallet activity.

        If background_full_rescan=True, a full rescan from genesis is triggered
        in the background after the initial import completes. This runs asynchronously
        and ensures no transactions are missed.

    Args:
        descriptors: List of output descriptors. Can be:
            - Simple strings: "wpkh(xpub.../0/*)"
            - Dicts with range:
              {"desc": "wpkh(xpub.../0/*)", "range": [0, DEFAULT_GAP_LIMIT - 1]}
        rescan: If True, rescan blockchain (behavior depends on smart_scan).
               If False, only track new transactions (timestamp="now").
        timestamp: Override timestamp. If None, uses smart calculation or 0/"now".
                  Can be Unix timestamp for partial rescan from specific time.
        smart_scan: If True and rescan=True, scan from ~1 year ago instead of genesis.
                   This allows quick startup. (default: True)
        background_full_rescan: If True and smart_scan=True, trigger full rescan
                               from genesis in background after import. (default: True)

    Returns:
        Import result from Bitcoin Core with additional 'background_rescan_started' key

    Example:
        # Smart scan (fast startup, background full rescan)
        await backend.import_descriptors([
            {
                "desc": "wpkh(xpub.../0/*)",
                "range": [0, DEFAULT_GAP_LIMIT - 1],
                "internal": False,
            },
        ], rescan=True, smart_scan=True)

        # Full rescan from genesis (slow but complete)
        await backend.import_descriptors([...], rescan=True, smart_scan=False)

        # No rescan (for brand new wallets with no history)
        await backend.import_descriptors([...], rescan=False)
    """
    if not self._wallet_loaded:
        raise RuntimeError("Wallet not loaded. Call create_wallet() first.")

    # Calculate appropriate timestamp
    background_rescan_needed = False
    if timestamp is None:
        if not rescan:
            timestamp = "now"
        elif smart_scan:
            # Smart scan: start from ~1 year ago for fast startup
            timestamp = await self._get_smart_scan_timestamp()
            background_rescan_needed = background_full_rescan
        else:
            # Full rescan from genesis
            timestamp = 0

    # Format descriptors for importdescriptors RPC
    import_requests = []
    for desc in descriptors:
        if isinstance(desc, str):
            # Add checksum if not present
            desc_with_checksum = await self._add_descriptor_checksum(desc)
            # Single address descriptors (addr(...)) cannot be active - they're not ranged
            is_ranged = "*" in desc or "range" in desc if isinstance(desc, str) else False
            import_requests.append(
                {
                    "desc": desc_with_checksum,
                    "timestamp": timestamp,
                    "active": is_ranged,  # Only ranged descriptors can be active
                    "internal": False,
                }
            )
        elif isinstance(desc, dict):
            desc_str = desc.get("desc", "")
            desc_with_checksum = await self._add_descriptor_checksum(desc_str)
            # Determine if descriptor is ranged (has * wildcard or explicit range)
            is_ranged = "*" in desc_str or "range" in desc
            request = {
                "desc": desc_with_checksum,
                "timestamp": timestamp,
                "active": is_ranged,  # Only ranged descriptors can be active
            }
            if "range" in desc:
                request["range"] = desc["range"]
            if "internal" in desc:
                request["internal"] = desc["internal"]
            import_requests.append(request)

    if SENSITIVE_LOGGING:
        logger.debug(f"Importing {len(import_requests)} descriptor(s): {import_requests}")
    else:
        if timestamp == 0:
            rescan_info = "from genesis (timestamp=0)"
        elif timestamp == "now":
            rescan_info = "no rescan (timestamp='now')"
        elif smart_scan and background_rescan_needed:
            rescan_info = (
                f"smart scan from ~1 year ago (timestamp={timestamp}), "
                "full rescan in background"
            )
        else:
            rescan_info = f"timestamp={timestamp}"
        logger.info(
            f"Importing {len(import_requests)} descriptor(s) into wallet ({rescan_info})..."
        )

    try:
        result = await self._rpc_call(
            "importdescriptors", [import_requests], client=self._import_client
        )

        # Check for errors in results
        success_count = sum(1 for r in result if r.get("success", False))
        error_count = len(result) - success_count

        if error_count > 0:
            errors = [
                r.get("error", {}).get("message", "unknown")
                for r in result
                if not r.get("success", False)
            ]
            logger.warning(f"Import completed with {error_count} error(s): {errors}")
            # Log full results for debugging
            for i, r in enumerate(result):
                if not r.get("success", False):
                    logger.debug(f"  Descriptor {i} failed: {r}")
        else:
            logger.info(f"Successfully imported {success_count} descriptor(s)")

        # Verify import by listing descriptors
        try:
            verify_result = await self._rpc_call("listdescriptors")
            actual_count = len(verify_result.get("descriptors", []))
            logger.debug(f"Verification: wallet now has {actual_count} descriptor(s)")
            if actual_count == 0 and success_count > 0:
                logger.error(
                    f"CRITICAL: Import reported {success_count} successes but wallet has "
                    f"0 descriptors! This may indicate a Bitcoin Core bug or wallet issue."
                )
        except Exception as e:
            logger.warning(f"Could not verify descriptor import: {e}")

        self._descriptors_imported = True

        # Trigger background full rescan if needed
        background_rescan_started = False
        if background_rescan_needed and success_count > 0:
            try:
                await self.start_background_rescan()
                background_rescan_started = True
            except Exception as e:
                logger.warning(f"Failed to start background rescan: {e}")

        return {
            "success_count": success_count,
            "error_count": error_count,
            "results": result,
            "background_rescan_started": background_rescan_started,
        }

    except Exception as e:
        logger.error(f"Failed to import descriptors: {e}")
        raise

Import descriptors into the wallet.

This is the key operation that enables efficient UTXO tracking. Once imported, Bitcoin Core will automatically track all addresses derived from these descriptors.

Smart Scan Behavior (smart_scan=True): Instead of scanning from genesis (which can take 20+ minutes on mainnet), the smart scan imports descriptors with a timestamp ~1 year in the past. This allows quick startup while still catching most wallet activity.

If background_full_rescan=True, a full rescan from genesis is triggered
in the background after the initial import completes. This runs asynchronously
and ensures no transactions are missed.

Args

descriptors
List of output descriptors. Can be: - Simple strings: "wpkh(xpub…/0/)" - Dicts with range: {"desc": "wpkh(xpub…/0/)", "range": [0, DEFAULT_GAP_LIMIT - 1]}
rescan
If True, rescan blockchain (behavior depends on smart_scan). If False, only track new transactions (timestamp="now").
timestamp
Override timestamp. If None, uses smart calculation or 0/"now". Can be Unix timestamp for partial rescan from specific time.
smart_scan
If True and rescan=True, scan from ~1 year ago instead of genesis. This allows quick startup. (default: True)
background_full_rescan
If True and smart_scan=True, trigger full rescan from genesis in background after import. (default: True)

Returns

Import result from Bitcoin Core with additional 'background_rescan_started' key

Example

Smart scan (fast startup, background full rescan)

await backend.import_descriptors([ { "desc": "wpkh(xpub…/0/*)", "range": [0, DEFAULT_GAP_LIMIT - 1], "internal": False, }, ], rescan=True, smart_scan=True)

Full rescan from genesis (slow but complete)

await backend.import_descriptors([…], rescan=True, smart_scan=False)

No rescan (for brand new wallets with no history)

await backend.import_descriptors([…], rescan=False)

def is_background_rescan_pending(self) ‑> bool
Expand source code
def is_background_rescan_pending(self) -> bool:
    """Check if a background rescan was started and may still be running."""
    return self._background_rescan_height is not None

Check if a background rescan was started and may still be running.

async def is_wallet_setup(self, expected_descriptor_count: int | None = None) ‑> bool
Expand source code
async def is_wallet_setup(self, expected_descriptor_count: int | None = None) -> bool:
    """
    Check if wallet is already set up with imported descriptors.

    Args:
        expected_descriptor_count: If provided, verifies this many descriptors are imported.
                                  For JoinMarket: 2 per mixdepth (external + internal)
                                  Example: 5 mixdepths = 10 descriptors minimum

    Returns:
        True if wallet exists and has descriptors imported

    Example:
        # Check if wallet is set up for 5 mixdepths
        if await backend.is_wallet_setup(expected_descriptor_count=10):
            # Already set up, just sync
            utxos = await wallet.sync_with_descriptor_wallet()
        else:
            # First time - import descriptors
            await wallet.setup_descriptor_wallet(rescan=True)
    """
    try:
        # Check if wallet exists and is loaded
        wallets = await self._rpc_call("listwallets", use_wallet=False)
        if self.wallet_name in wallets:
            self._wallet_loaded = True
        else:
            # Try to load it
            try:
                await self._rpc_call("loadwallet", [self.wallet_name], use_wallet=False)
                self._wallet_loaded = True
            except ValueError:
                return False

        # Check if descriptors are imported
        descriptors = await self.list_descriptors()
        if not descriptors:
            return False

        # If expected count provided, verify
        if expected_descriptor_count is not None:
            return len(descriptors) >= expected_descriptor_count

        return True

    except Exception as e:
        logger.debug(f"Wallet setup check failed: {e}")
        return False

Check if wallet is already set up with imported descriptors.

Args

expected_descriptor_count
If provided, verifies this many descriptors are imported. For JoinMarket: 2 per mixdepth (external + internal) Example: 5 mixdepths = 10 descriptors minimum

Returns

True if wallet exists and has descriptors imported

Example

Check if wallet is set up for 5 mixdepths

if await backend.is_wallet_setup(expected_descriptor_count=10): # Already set up, just sync utxos = await wallet.sync_with_descriptor_wallet() else: # First time - import descriptors await wallet.setup_descriptor_wallet(rescan=True)

async def list_descriptors(self) ‑> list[dict[str, typing.Any]]
Expand source code
async def list_descriptors(self) -> list[dict[str, Any]]:
    """
    List all descriptors currently imported in the wallet.

    Returns:
        List of descriptor info dicts with fields like 'desc', 'timestamp', 'active', etc.

    Example:
        descriptors = await backend.list_descriptors()
        for d in descriptors:
            print(f"Descriptor: {d['desc']}, Active: {d.get('active', False)}")
    """
    if not self._wallet_loaded:
        raise RuntimeError("Wallet not loaded. Call create_wallet() first.")

    try:
        result = await self._rpc_call("listdescriptors")
        return result.get("descriptors", [])
    except Exception as e:
        logger.error(f"Failed to list descriptors: {e}")
        raise

List all descriptors currently imported in the wallet.

Returns

List of descriptor info dicts with fields like 'desc', 'timestamp', 'active', etc.

Example

descriptors = await backend.list_descriptors() for d in descriptors: print(f"Descriptor: {d['desc']}, Active: {d.get('active', False)}")

async def rescan_blockchain(self, start_height: int = 0) ‑> dict[str, typing.Any]
Expand source code
async def rescan_blockchain(self, start_height: int = 0) -> dict[str, Any]:
    """
    Rescan blockchain from given height.

    Useful after importing new descriptors or recovering wallet.

    Args:
        start_height: Block height to start rescan from

    Returns:
        Rescan result
    """
    try:
        logger.info(f"Starting blockchain rescan from height {start_height}...")
        result = await self._rpc_call(
            "rescanblockchain",
            [start_height],
            client=self._import_client,  # Use longer timeout
        )
        logger.info(f"Rescan complete: {result}")
        return result
    except Exception as e:
        logger.error(f"Rescan failed: {e}")
        raise

Rescan blockchain from given height.

Useful after importing new descriptors or recovering wallet.

Args

start_height
Block height to start rescan from

Returns

Rescan result

async def setup_wallet(self,
descriptors: Sequence[str | dict[str, Any]],
rescan: bool = True,
smart_scan: bool = True,
background_full_rescan: bool = True) ‑> bool
Expand source code
async def setup_wallet(
    self,
    descriptors: Sequence[str | dict[str, Any]],
    rescan: bool = True,
    smart_scan: bool = True,
    background_full_rescan: bool = True,
) -> bool:
    """
    Complete wallet setup: create wallet and import descriptors.

    This is a convenience method for initial setup. By default, uses smart scan
    for fast startup with a background full rescan.

    Args:
        descriptors: Descriptors to import
        rescan: Whether to rescan blockchain
        smart_scan: If True and rescan=True, scan from ~1 year ago (fast startup)
        background_full_rescan: If True and smart_scan=True, run full rescan in background

    Returns:
        True if setup completed successfully
    """
    await self.create_wallet(disable_private_keys=True)
    await self.import_descriptors(
        descriptors,
        rescan=rescan,
        smart_scan=smart_scan,
        background_full_rescan=background_full_rescan,
    )
    return True

Complete wallet setup: create wallet and import descriptors.

This is a convenience method for initial setup. By default, uses smart scan for fast startup with a background full rescan.

Args

descriptors
Descriptors to import
rescan
Whether to rescan blockchain
smart_scan
If True and rescan=True, scan from ~1 year ago (fast startup)
background_full_rescan
If True and smart_scan=True, run full rescan in background

Returns

True if setup completed successfully

async def start_background_rescan(self, start_height: int = 0) ‑> None
Expand source code
async def start_background_rescan(self, start_height: int = 0) -> None:
    """
    Start a background blockchain rescan from the given height.

    This triggers a rescan that runs asynchronously in Bitcoin Core.
    The rescan will find any transactions that were missed by the
    initial smart scan (which only scans recent blocks).

    Unlike the synchronous rescan in import_descriptors, this method
    returns immediately and the rescan continues in the background.

    Args:
        start_height: Block height to start rescan from (default: 0 = genesis)
    """
    if not self._wallet_loaded:
        raise RuntimeError("Wallet not loaded. Call create_wallet() first.")

    try:
        logger.info(
            f"Starting background blockchain rescan from height {start_height}. "
            "This will run in the background and may take several minutes on mainnet."
        )

        # rescanblockchain runs in the background when called via RPC
        # We use a fire-and-forget approach with a short timeout client
        # to avoid blocking on the full rescan
        import asyncio

        # Create a task that won't block the caller
        # We don't await it - let it run in background
        asyncio.create_task(self._run_background_rescan(start_height))

        self._background_rescan_height = start_height

    except Exception as e:
        logger.error(f"Failed to start background rescan: {e}")
        raise

Start a background blockchain rescan from the given height.

This triggers a rescan that runs asynchronously in Bitcoin Core. The rescan will find any transactions that were missed by the initial smart scan (which only scans recent blocks).

Unlike the synchronous rescan in import_descriptors, this method returns immediately and the rescan continues in the background.

Args

start_height
Block height to start rescan from (default: 0 = genesis)
async def unload_wallet(self) ‑> None
Expand source code
async def unload_wallet(self) -> None:
    """Unload the wallet from Bitcoin Core."""
    if self._wallet_loaded:
        try:
            await self._rpc_call("unloadwallet", [self.wallet_name], use_wallet=False)
            logger.info(f"Unloaded wallet '{self.wallet_name}'")
            self._wallet_loaded = False
        except Exception as e:
            logger.warning(f"Failed to unload wallet: {e}")

Unload the wallet from Bitcoin Core.

async def upgrade_descriptor_ranges(self,
descriptors: Sequence[str | dict[str, Any]],
new_range_end: int,
rescan: bool = False) ‑> dict[str, typing.Any]
Expand source code
async def upgrade_descriptor_ranges(
    self,
    descriptors: Sequence[str | dict[str, Any]],
    new_range_end: int,
    rescan: bool = False,
) -> dict[str, Any]:
    """
    Upgrade descriptor ranges to track more addresses.

    This re-imports existing descriptors with a larger range. Bitcoin Core
    will automatically track the new addresses without re-scanning the entire
    blockchain (unless rescan=True is specified).

    This is useful when a wallet has grown beyond the initially imported range.
    For example, if originally imported with range [0, 999] and now need to
    track addresses up to index 5000.

    Args:
        descriptors: List of descriptors to upgrade (same format as import_descriptors)
        new_range_end: New end index for the range (e.g., 5000 for [0, 5000])
        rescan: Whether to rescan blockchain for the new addresses.
               Usually not needed if wallet was already tracking some range.

    Returns:
        Import result from Bitcoin Core

    Note:
        Re-importing with a larger range is safe - Bitcoin Core will extend
        the tracking without duplicating or losing existing data.
    """
    if not self._wallet_loaded:
        raise RuntimeError("Wallet not loaded. Call create_wallet() first.")

    # Update ranges in descriptor dicts
    updated_descriptors = []
    for desc in descriptors:
        if isinstance(desc, str):
            # String descriptor - add range
            updated_descriptors.append(
                {
                    "desc": desc,
                    "range": [0, new_range_end],
                }
            )
        elif isinstance(desc, dict):
            # Dict descriptor - update range
            updated = dict(desc)
            if "*" in updated.get("desc", ""):  # Only ranged descriptors
                updated["range"] = [0, new_range_end]
            updated_descriptors.append(updated)

    logger.info(
        f"Upgrading {len(updated_descriptors)} descriptor(s) to range [0, {new_range_end}]"
    )

    # Re-import with new range
    # timestamp="now" means don't rescan unless explicitly requested
    return await self.import_descriptors(
        updated_descriptors,
        rescan=rescan,
        timestamp=0 if rescan else "now",
        smart_scan=False,  # Don't use smart scan for upgrades
        background_full_rescan=False,
    )

Upgrade descriptor ranges to track more addresses.

This re-imports existing descriptors with a larger range. Bitcoin Core will automatically track the new addresses without re-scanning the entire blockchain (unless rescan=True is specified).

This is useful when a wallet has grown beyond the initially imported range. For example, if originally imported with range [0, 999] and now need to track addresses up to index 5000.

Args

descriptors
List of descriptors to upgrade (same format as import_descriptors)
new_range_end
New end index for the range (e.g., 5000 for [0, 5000])
rescan
Whether to rescan blockchain for the new addresses. Usually not needed if wallet was already tracking some range.

Returns

Import result from Bitcoin Core

Note

Re-importing with a larger range is safe - Bitcoin Core will extend the tracking without duplicating or losing existing data.

async def wait_for_rescan_complete(self,
poll_interval: float = 5.0,
timeout: float | None = None,
progress_callback: Callable[[float], None] | None = None) ‑> bool
Expand source code
async def wait_for_rescan_complete(
    self,
    poll_interval: float = 5.0,
    timeout: float | None = None,
    progress_callback: Callable[[float], None] | None = None,
) -> bool:
    """
    Wait for any ongoing wallet rescan to complete.

    This is useful after importing descriptors with rescan=True to ensure
    the wallet is fully synced before querying UTXOs.

    Args:
        poll_interval: How often to check rescan status (seconds)
        timeout: Maximum time to wait (seconds). None = wait indefinitely.
        progress_callback: Optional callback(progress) called with progress 0.0-1.0

    Returns:
        True if rescan completed, False if timed out
    """
    import time

    start_time = time.time()

    while True:
        status = await self.get_rescan_status()

        if status is None or not status.get("in_progress", False):
            # No rescan in progress, we're done
            return True

        progress = status.get("progress", 0)
        if progress_callback:
            progress_callback(progress)

        logger.debug(f"Rescan in progress: {progress:.1%}")

        if timeout is not None and (time.time() - start_time) > timeout:
            logger.warning(f"Rescan wait timed out after {timeout}s")
            return False

        await asyncio.sleep(poll_interval)

Wait for any ongoing wallet rescan to complete.

This is useful after importing descriptors with rescan=True to ensure the wallet is fully synced before querying UTXOs.

Args

poll_interval
How often to check rescan status (seconds)
timeout
Maximum time to wait (seconds). None = wait indefinitely.
progress_callback
Optional callback(progress) called with progress 0.0-1.0

Returns

True if rescan completed, False if timed out

Inherited members

class NeutrinoBackend (neutrino_url: str = 'http://127.0.0.1:8334',
network: str = 'mainnet',
connect_peers: list[str] | None = None,
data_dir: str = '/data/neutrino')
Expand source code
class NeutrinoBackend(BlockchainBackend):
    """
    Blockchain backend using Neutrino light client.

    Neutrino is a privacy-preserving Bitcoin light client that uses
    BIP157/BIP158 compact block filters instead of traditional SPV.

    Communication with the neutrino daemon is via REST API.
    The neutrino daemon should be running alongside this client.
    """

    def __init__(
        self,
        neutrino_url: str = "http://127.0.0.1:8334",
        network: str = "mainnet",
        connect_peers: list[str] | None = None,
        data_dir: str = "/data/neutrino",
    ):
        """
        Initialize Neutrino backend.

        Args:
            neutrino_url: URL of the neutrino REST API (default port 8334)
            network: Bitcoin network (mainnet, testnet, regtest, signet)
            connect_peers: List of peer addresses to connect to (optional)
            data_dir: Directory for neutrino data (headers, filters)
        """
        self.neutrino_url = neutrino_url.rstrip("/")
        self.network = network
        self.connect_peers = connect_peers or []
        self.data_dir = data_dir
        self.client = httpx.AsyncClient(timeout=60.0)

        # Cache for watched addresses (neutrino needs to know what to scan for)
        self._watched_addresses: set[str] = set()
        self._watched_outpoints: set[tuple[str, int]] = set()

        # Security limits to prevent DoS via excessive watch list / rescan abuse
        self._max_watched_addresses: int = 10000  # Maximum addresses to track
        self._max_rescan_depth: int = 100000  # Maximum blocks to rescan (roughly 2 years)
        self._min_valid_blockheight: int = 481824  # SegWit activation (mainnet)
        # For testnet/regtest, this will be adjusted based on network

        # Block filter cache
        self._filter_header_tip: int = 0
        self._synced: bool = False

        # Track if we've done the initial rescan
        self._initial_rescan_done: bool = False

        # Track the last block height we rescanned to (for incremental rescans)
        self._last_rescan_height: int = 0

        # Track if we just triggered a rescan (to avoid waiting multiple times)
        self._rescan_in_progress: bool = False

        # Track if we just completed a rescan (to enable retry logic for async UTXO lookups)
        self._just_rescanned: bool = False

        # Adjust minimum blockheight based on network
        if network == "regtest":
            self._min_valid_blockheight = 0  # Regtest can have any height
        elif network == "testnet":
            self._min_valid_blockheight = 834624  # Approximate SegWit on testnet
        elif network == "signet":
            self._min_valid_blockheight = 0  # Signet started with SegWit

    async def _api_call(
        self,
        method: str,
        endpoint: str,
        params: dict[str, Any] | None = None,
        data: dict[str, Any] | None = None,
    ) -> Any:
        """Make an API call to the neutrino daemon."""
        url = f"{self.neutrino_url}/{endpoint}"

        try:
            if method == "GET":
                response = await self.client.get(url, params=params)
            elif method == "POST":
                response = await self.client.post(url, json=data)
            else:
                raise ValueError(f"Unsupported HTTP method: {method}")

            response.raise_for_status()
            return response.json()

        except httpx.HTTPStatusError as e:
            # 404 responses are expected during normal operation (unconfirmed txs, spent UTXOs)
            # Don't log them as errors to avoid confusing users
            if e.response.status_code == 404:
                logger.debug(f"Neutrino API returned 404: {endpoint}")
            else:
                logger.error(f"Neutrino API call failed: {endpoint} - {e}")
            raise
        except httpx.HTTPError as e:
            logger.error(f"Neutrino API call failed: {endpoint} - {e}")
            raise

    async def wait_for_sync(self, timeout: float = 300.0) -> bool:
        """
        Wait for neutrino to sync block headers and filters.

        Args:
            timeout: Maximum time to wait in seconds

        Returns:
            True if synced, False if timeout
        """
        start_time = asyncio.get_event_loop().time()

        while True:
            try:
                status = await self._api_call("GET", "v1/status")
                synced = status.get("synced", False)
                block_height = status.get("block_height", 0)
                filter_height = status.get("filter_height", 0)

                if synced and block_height == filter_height:
                    self._synced = True
                    self._filter_header_tip = block_height
                    logger.info(f"Neutrino synced at height {block_height}")
                    return True

                logger.debug(f"Syncing... blocks: {block_height}, filters: {filter_height}")

            except Exception as e:
                logger.warning(f"Waiting for neutrino daemon: {e}")

            elapsed = asyncio.get_event_loop().time() - start_time
            if elapsed > timeout:
                logger.error("Neutrino sync timeout")
                return False

            await asyncio.sleep(2.0)

    async def add_watch_address(self, address: str) -> None:
        """
        Add an address to the local watch list.

        In neutrino-api v0.4, address watching is implicit - you just query
        UTXOs or do rescans with the addresses you care about. This method
        tracks addresses locally for convenience.

        Security: Limits the number of watched addresses to prevent memory
        exhaustion attacks.

        Args:
            address: Bitcoin address to watch

        Raises:
            ValueError: If watch list limit exceeded
        """
        if address in self._watched_addresses:
            return

        if len(self._watched_addresses) >= self._max_watched_addresses:
            logger.warning(
                f"Watch list limit reached ({self._max_watched_addresses}). "
                f"Cannot add address: {address[:20]}..."
            )
            raise ValueError(f"Watch list limit ({self._max_watched_addresses}) exceeded")

        self._watched_addresses.add(address)
        logger.trace(f"Watching address: {address}")

    async def add_watch_outpoint(self, txid: str, vout: int) -> None:
        """
        Add an outpoint to the local watch list.

        In neutrino-api v0.4, outpoint watching is done via UTXO queries
        with the address parameter. This method tracks outpoints locally.

        Args:
            txid: Transaction ID
            vout: Output index
        """
        outpoint = (txid, vout)
        if outpoint in self._watched_outpoints:
            return

        self._watched_outpoints.add(outpoint)
        logger.debug(f"Watching outpoint: {txid}:{vout}")

    async def get_utxos(self, addresses: list[str]) -> list[UTXO]:
        """
        Get UTXOs for given addresses using neutrino's rescan capability.

        Neutrino will scan the blockchain using compact block filters
        to find transactions relevant to the watched addresses.

        On first call, triggers a full blockchain rescan from genesis to ensure
        all historical UTXOs are found (critical for wallets funded before neutrino started).

        After initial rescan, automatically rescans if new blocks have arrived
        to detect transactions that occurred after the last scan.
        """
        utxos: list[UTXO] = []

        # Add addresses to watch list
        for address in addresses:
            await self.add_watch_address(address)

        # Get current tip height to check if new blocks have arrived
        current_height = await self.get_block_height()

        # On first UTXO query, trigger a full blockchain rescan to find existing UTXOs
        # This is critical for wallets that were funded before neutrino was watching them
        logger.debug(
            f"get_utxos: _initial_rescan_done={self._initial_rescan_done}, "
            f"watched_addresses={len(self._watched_addresses)}, "
            f"last_rescan={self._last_rescan_height}, current={current_height}"
        )
        if not self._initial_rescan_done and self._watched_addresses:
            logger.info(
                f"Performing initial blockchain rescan for {len(self._watched_addresses)} "
                "watched addresses (this may take a moment)..."
            )
            try:
                # Trigger rescan from block 0 for all watched addresses
                await self._api_call(
                    "POST",
                    "v1/rescan",
                    data={
                        "addresses": list(self._watched_addresses),
                        "start_height": 0,
                    },
                )
                # Wait for rescan to complete (neutrino processes this asynchronously)
                # On regtest with ~3000 blocks, this typically takes 5-10 seconds
                await asyncio.sleep(10.0)
                self._initial_rescan_done = True
                self._last_rescan_height = current_height
                self._rescan_in_progress = False
                self._just_rescanned = True
                logger.info("Initial blockchain rescan completed")
            except Exception as e:
                logger.warning(f"Initial rescan failed (will retry on next sync): {e}")
                self._rescan_in_progress = False
        elif current_height > self._last_rescan_height and not self._rescan_in_progress:
            # New blocks have arrived since last rescan - need to scan them
            # This is critical for finding CoinJoin outputs that were just confirmed
            # We rescan ALL watched addresses, not just the ones in the current query,
            # because wallet sync happens mixdepth by mixdepth and we need to find
            # outputs to any of our addresses
            self._rescan_in_progress = True
            logger.info(
                f"New blocks detected ({self._last_rescan_height} -> {current_height}), "
                f"rescanning for {len(self._watched_addresses)} watched addresses..."
            )
            try:
                # Rescan from just before the last known height to catch edge cases
                start_height = max(0, self._last_rescan_height - 1)

                await self._api_call(
                    "POST",
                    "v1/rescan",
                    data={
                        "addresses": list(self._watched_addresses),
                        "start_height": start_height,
                    },
                )
                # Wait for rescan to complete
                # NOTE: The rescan is asynchronous - neutrino needs time to:
                # 1. Match block filters
                # 2. Download full blocks that match
                # 3. Extract and index UTXOs
                # We wait 7 seconds to allow time for indexing to complete
                await asyncio.sleep(7.0)

                self._last_rescan_height = current_height
                self._rescan_in_progress = False
                self._just_rescanned = True
                logger.info(
                    f"Incremental rescan completed from block {start_height} to {current_height}"
                )
            except Exception as e:
                logger.warning(f"Incremental rescan failed: {e}")
                self._rescan_in_progress = False
        elif self._rescan_in_progress:
            # A rescan was just triggered by a previous get_utxos call in this batch
            # Wait a bit for it to complete, but don't wait the full 7 seconds
            logger.debug("Rescan in progress from previous query, waiting briefly...")
            await asyncio.sleep(1.0)
        else:
            # No new blocks, just wait for filter matching / async UTXO lookups
            await asyncio.sleep(0.5)

        try:
            # Request UTXO scan for addresses with retry logic
            # The neutrino API performs UTXO lookups asynchronously, so we may need
            # to retry if the initial query happens before async indexing completes.
            # We only retry if we just completed a rescan (indicated by _just_rescanned flag)
            # to avoid unnecessary delays when scanning addresses that have no UTXOs.
            max_retries = 5 if self._just_rescanned else 1
            result: dict[str, Any] = {"utxos": []}

            for retry in range(max_retries):
                result = await self._api_call(
                    "POST",
                    "v1/utxos",
                    data={"addresses": addresses},
                )

                utxo_count = len(result.get("utxos", []))

                # If we found UTXOs or this is the last retry, proceed
                if utxo_count > 0 or retry == max_retries - 1:
                    if retry > 0 and self._just_rescanned:
                        logger.debug(f"Found {utxo_count} UTXOs after {retry + 1} attempts")
                    break

                # No UTXOs yet - wait with exponential backoff before retrying
                # This allows time for async UTXO indexing to complete
                wait_time = 1.5**retry  # 1.0s, 1.5s, 2.25s, 3.37s, 5.06s
                logger.debug(
                    f"No UTXOs found on attempt {retry + 1}/{max_retries}, "
                    f"waiting {wait_time:.2f}s for async indexing..."
                )
                await asyncio.sleep(wait_time)

            # Reset the flag after we've completed the UTXO query
            # (subsequent queries in this batch won't need full retry)
            if self._just_rescanned:
                self._just_rescanned = False

            tip_height = await self.get_block_height()

            for utxo_data in result.get("utxos", []):
                height = utxo_data.get("height", 0)
                confirmations = 0
                if height > 0:
                    confirmations = tip_height - height + 1

                utxo = UTXO(
                    txid=utxo_data["txid"],
                    vout=utxo_data["vout"],
                    value=utxo_data["value"],
                    address=utxo_data.get("address", ""),
                    confirmations=confirmations,
                    scriptpubkey=utxo_data.get("scriptpubkey", ""),
                    height=height if height > 0 else None,
                )
                utxos.append(utxo)

            logger.debug(f"Found {len(utxos)} UTXOs for {len(addresses)} addresses")

        except Exception as e:
            logger.error(f"Failed to fetch UTXOs: {e}")

        return utxos

    async def get_address_balance(self, address: str) -> int:
        """Get balance for an address in satoshis."""
        utxos = await self.get_utxos([address])
        balance = sum(utxo.value for utxo in utxos)
        logger.debug(f"Balance for {address}: {balance} sats")
        return balance

    async def broadcast_transaction(self, tx_hex: str) -> str:
        """
        Broadcast transaction via neutrino to the P2P network.

        Neutrino maintains P2P connections and can broadcast transactions
        directly to connected peers.
        """
        try:
            result = await self._api_call(
                "POST",
                "v1/tx/broadcast",
                data={"tx_hex": tx_hex},
            )
            txid = result.get("txid", "")
            logger.info(f"Broadcast transaction: {txid}")
            return txid

        except Exception as e:
            logger.error(f"Failed to broadcast transaction: {e}")
            raise ValueError(f"Broadcast failed: {e}") from e

    async def get_transaction(self, txid: str) -> Transaction | None:
        """
        Get transaction by txid.

        Note: Neutrino uses compact block filters (BIP158) and can only fetch
        transactions for addresses it has rescanned. It cannot fetch arbitrary
        transactions by txid alone. This method always returns None.

        For verification after broadcast, rely on UTXO checks with known addresses
        and block heights instead.
        """
        # Neutrino doesn't support fetching arbitrary transactions by txid
        # It can only work with UTXOs for known addresses via compact filters
        return None

    async def verify_tx_output(
        self,
        txid: str,
        vout: int,
        address: str,
        start_height: int | None = None,
    ) -> bool:
        """
        Verify that a specific transaction output exists using neutrino's UTXO endpoint.

        Uses GET /v1/utxo/{txid}/{vout}?address=...&start_height=... to check if
        the output exists. This works because neutrino uses compact block filters
        that can match on addresses.

        Args:
            txid: Transaction ID to verify
            vout: Output index to check
            address: The address that should own this output
            start_height: Block height hint for efficient scanning (recommended)

        Returns:
            True if the output exists, False otherwise
        """
        try:
            params: dict[str, str | int] = {"address": address}
            if start_height is not None:
                params["start_height"] = start_height

            result = await self._api_call(
                "GET",
                f"v1/utxo/{txid}/{vout}",
                params=params,
            )

            # If we got a response with unspent status, the output exists
            # Note: Even spent outputs confirm the transaction was broadcast
            if result is not None:
                logger.debug(
                    f"Verified tx output {txid}:{vout} exists "
                    f"(unspent={result.get('unspent', 'unknown')})"
                )
                return True

            return False

        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                # Output not found
                logger.debug(f"Tx output {txid}:{vout} not found")
                return False
            logger.warning(f"Error verifying tx output {txid}:{vout}: {e}")
            return False
        except Exception as e:
            logger.warning(f"Error verifying tx output {txid}:{vout}: {e}")
            return False

    async def estimate_fee(self, target_blocks: int) -> float:
        """
        Estimate fee in sat/vbyte for target confirmation blocks.

        Neutrino does not support fee estimation - returns conservative defaults.
        Use can_estimate_fee() to check if reliable estimation is available.
        """
        # Neutrino cannot estimate fees - return conservative defaults
        if target_blocks <= 1:
            return 5.0
        elif target_blocks <= 3:
            return 2.0
        elif target_blocks <= 6:
            return 1.0
        else:
            return 1.0

    def can_estimate_fee(self) -> bool:
        """Neutrino cannot reliably estimate fees - requires full node."""
        return False

    def has_mempool_access(self) -> bool:
        """Neutrino cannot access mempool - only sees confirmed transactions.

        BIP157/158 compact block filters only match confirmed blocks.
        Unconfirmed transactions in the mempool are not visible to Neutrino.

        This means verify_tx_output() will return False for valid transactions
        that are in the mempool but not yet confirmed. Takers using Neutrino
        must use alternative verification strategies (e.g., trust maker ACKs,
        multi-maker broadcast, wait for confirmation).
        """
        return False

    async def get_block_height(self) -> int:
        """Get current blockchain height from neutrino."""
        try:
            result = await self._api_call("GET", "v1/status")
            height = result.get("block_height", 0)
            logger.debug(f"Current block height: {height}")
            return height

        except Exception as e:
            logger.error(f"Failed to fetch block height: {e}")
            raise

    async def get_block_time(self, block_height: int) -> int:
        """Get block time (unix timestamp) for given height."""
        try:
            result = await self._api_call(
                "GET",
                f"v1/block/{block_height}/header",
            )
            timestamp = result.get("timestamp", 0)
            logger.debug(f"Block {block_height} timestamp: {timestamp}")
            return timestamp

        except Exception as e:
            logger.error(f"Failed to fetch block time for height {block_height}: {e}")
            raise

    async def get_block_hash(self, block_height: int) -> str:
        """Get block hash for given height."""
        try:
            result = await self._api_call(
                "GET",
                f"v1/block/{block_height}/header",
            )
            block_hash = result.get("hash", "")
            logger.debug(f"Block hash for height {block_height}: {block_hash}")
            return block_hash

        except Exception as e:
            logger.error(f"Failed to fetch block hash for height {block_height}: {e}")
            raise

    async def get_utxo(self, txid: str, vout: int) -> UTXO | None:
        """Get a specific UTXO from the blockchain.
        Returns None if the UTXO does not exist or has been spent."""
        # Neutrino uses compact block filters and cannot perform arbitrary
        # UTXO lookups without the address. The API endpoint v1/utxo/{txid}/{vout}
        # requires the 'address' parameter to scan filter matches.
        #
        # If we don't have the address, we can't look it up.
        # Callers should use verify_utxo_with_metadata() instead.
        return None

    def requires_neutrino_metadata(self) -> bool:
        """
        Neutrino backend requires metadata for arbitrary UTXO verification.

        Without scriptPubKey and blockheight hints, Neutrino cannot verify
        UTXOs that it hasn't been watching from the start.

        Returns:
            True - Neutrino always requires metadata for counterparty UTXOs
        """
        return True

    def can_provide_neutrino_metadata(self) -> bool:
        """
        Neutrino backend cannot reliably provide metadata for all UTXOs.

        Light clients can only provide metadata for UTXOs they've been watching.
        They cannot provide metadata for arbitrary addresses like full nodes can.

        Returns:
            False - Neutrino cannot provide metadata for arbitrary UTXOs
        """
        return False

    async def verify_utxo_with_metadata(
        self,
        txid: str,
        vout: int,
        scriptpubkey: str,
        blockheight: int,
    ) -> UTXOVerificationResult:
        """
        Verify a UTXO using provided metadata (neutrino_compat feature).

        This is the key method that enables Neutrino light clients to verify
        counterparty UTXOs in CoinJoin without arbitrary blockchain queries.

        Uses the neutrino-api v0.4 UTXO check endpoint which requires:
        - address: The Bitcoin address that owns the UTXO (derived from scriptPubKey)
        - start_height: Block height to start scanning from (for efficiency)

        The API scans from start_height to chain tip using compact block filters
        to determine if the UTXO exists and whether it has been spent.

        Security: Validates blockheight to prevent rescan abuse attacks where
        malicious peers provide very low blockheights to trigger expensive rescans.

        Args:
            txid: Transaction ID
            vout: Output index
            scriptpubkey: Expected scriptPubKey (hex) - used to derive address
            blockheight: Block height where UTXO was confirmed - scan start hint

        Returns:
            UTXOVerificationResult with verification status and UTXO data
        """
        # Security: Validate blockheight to prevent rescan abuse
        tip_height = await self.get_block_height()

        if blockheight < self._min_valid_blockheight:
            return UTXOVerificationResult(
                valid=False,
                error=f"Blockheight {blockheight} is below minimum valid height "
                f"{self._min_valid_blockheight} for {self.network}",
            )

        if blockheight > tip_height:
            return UTXOVerificationResult(
                valid=False,
                error=f"Blockheight {blockheight} is in the future (tip: {tip_height})",
            )

        # Limit rescan depth to prevent DoS
        rescan_depth = tip_height - blockheight
        if rescan_depth > self._max_rescan_depth:
            return UTXOVerificationResult(
                valid=False,
                error=f"Rescan depth {rescan_depth} exceeds max {self._max_rescan_depth}. "
                f"UTXO too old for efficient verification.",
            )

        logger.debug(
            f"Verifying UTXO {txid}:{vout} with metadata "
            f"(scriptpubkey={scriptpubkey[:20]}..., blockheight={blockheight})"
        )

        # Step 1: Derive address from scriptPubKey
        # The neutrino-api v0.4 requires the address for UTXO lookup
        address = self._scriptpubkey_to_address(scriptpubkey)
        if not address:
            return UTXOVerificationResult(
                valid=False,
                error=f"Could not derive address from scriptPubKey: {scriptpubkey[:40]}...",
            )

        logger.debug(f"Derived address {address} from scriptPubKey")

        try:
            # Step 2: Query the specific UTXO using the v0.4 API
            # GET /v1/utxo/{txid}/{vout}?address=...&start_height=...
            #
            # The start_height parameter is critical for performance:
            # - Scanning 1 block takes ~0.01s
            # - Scanning 100 blocks takes ~0.5s
            # - Scanning 10,000+ blocks can take minutes
            #
            # We use blockheight - 1 as a safety margin in case of reorgs
            start_height = max(0, blockheight - 1)

            result = await self._api_call(
                "GET",
                f"v1/utxo/{txid}/{vout}",
                params={"address": address, "start_height": start_height},
            )

            # Check if UTXO is unspent
            if not result.get("unspent", False):
                spending_txid = result.get("spending_txid", "unknown")
                spending_height = result.get("spending_height", "unknown")
                return UTXOVerificationResult(
                    valid=False,
                    error=f"UTXO has been spent in tx {spending_txid} at height {spending_height}",
                )

            # Step 3: Verify scriptPubKey matches
            actual_scriptpubkey = result.get("scriptpubkey", "")
            scriptpubkey_matches = actual_scriptpubkey.lower() == scriptpubkey.lower()

            if not scriptpubkey_matches:
                return UTXOVerificationResult(
                    valid=False,
                    value=result.get("value", 0),
                    error=f"ScriptPubKey mismatch: expected {scriptpubkey[:20]}..., "
                    f"got {actual_scriptpubkey[:20]}...",
                    scriptpubkey_matches=False,
                )

            # Step 4: Calculate confirmations
            tip_height = await self.get_block_height()
            # The blockheight parameter is the confirmation height hint from the peer
            confirmations = tip_height - blockheight + 1 if blockheight > 0 else 0

            logger.info(
                f"UTXO {txid}:{vout} verified: value={result.get('value', 0)}, "
                f"confirmations={confirmations}"
            )

            return UTXOVerificationResult(
                valid=True,
                value=result.get("value", 0),
                confirmations=confirmations,
                scriptpubkey_matches=True,
            )

        except httpx.HTTPStatusError as e:
            if e.response.status_code == 404:
                return UTXOVerificationResult(
                    valid=False,
                    error="UTXO not found - may not exist or address derivation failed",
                )
            return UTXOVerificationResult(
                valid=False,
                error=f"UTXO query failed: {e}",
            )
        except Exception as e:
            return UTXOVerificationResult(
                valid=False,
                error=f"Verification failed: {e}",
            )

    def _scriptpubkey_to_address(self, scriptpubkey: str) -> str | None:
        """
        Convert scriptPubKey to address for watch list.

        Supports common script types:
        - P2WPKH: 0014<20-byte-hash> -> bc1q...
        - P2WSH: 0020<32-byte-hash> -> bc1q...
        - P2PKH: 76a914<20-byte-hash>88ac -> 1...
        - P2SH: a914<20-byte-hash>87 -> 3...

        Args:
            scriptpubkey: Hex-encoded scriptPubKey

        Returns:
            Bitcoin address or None if conversion fails
        """
        try:
            script_bytes = bytes.fromhex(scriptpubkey)

            # P2WPKH: OP_0 <20 bytes>
            if len(script_bytes) == 22 and script_bytes[0] == 0x00 and script_bytes[1] == 0x14:
                # Use bech32 encoding
                return self._encode_bech32_address(script_bytes[2:], 0)

            # P2WSH: OP_0 <32 bytes>
            if len(script_bytes) == 34 and script_bytes[0] == 0x00 and script_bytes[1] == 0x20:
                return self._encode_bech32_address(script_bytes[2:], 0)

            # P2PKH: OP_DUP OP_HASH160 <20 bytes> OP_EQUALVERIFY OP_CHECKSIG
            if (
                len(script_bytes) == 25
                and script_bytes[0] == 0x76
                and script_bytes[1] == 0xA9
                and script_bytes[2] == 0x14
                and script_bytes[23] == 0x88
                and script_bytes[24] == 0xAC
            ):
                return self._encode_base58check_address(script_bytes[3:23], 0x00)

            # P2SH: OP_HASH160 <20 bytes> OP_EQUAL
            if (
                len(script_bytes) == 23
                and script_bytes[0] == 0xA9
                and script_bytes[1] == 0x14
                and script_bytes[22] == 0x87
            ):
                return self._encode_base58check_address(script_bytes[2:22], 0x05)

            logger.warning(f"Unknown scriptPubKey format: {scriptpubkey[:20]}...")
            return None

        except Exception as e:
            logger.warning(f"Failed to convert scriptPubKey to address: {e}")
            return None

    def _encode_bech32_address(self, witness_program: bytes, witness_version: int) -> str:
        """Encode witness program as bech32 address."""
        # Simplified bech32 encoding - in production use a proper library
        hrp = "bc" if self.network == "mainnet" else "bcrt" if self.network == "regtest" else "tb"

        # Convert witness program to 5-bit groups
        def convertbits(data: bytes, frombits: int, tobits: int, pad: bool = True) -> list[int]:
            acc = 0
            bits = 0
            ret = []
            maxv = (1 << tobits) - 1
            for value in data:
                acc = (acc << frombits) | value
                bits += frombits
                while bits >= tobits:
                    bits -= tobits
                    ret.append((acc >> bits) & maxv)
            if pad and bits:
                ret.append((acc << (tobits - bits)) & maxv)
            return ret

        charset = "qpzry9x8gf2tvdw0s3jn54khce6mua7l"

        def bech32_polymod(values: list[int]) -> int:
            gen = [0x3B6A57B2, 0x26508E6D, 0x1EA119FA, 0x3D4233DD, 0x2A1462B3]
            chk = 1
            for v in values:
                b = chk >> 25
                chk = ((chk & 0x1FFFFFF) << 5) ^ v
                for i in range(5):
                    chk ^= gen[i] if ((b >> i) & 1) else 0
            return chk

        def bech32_hrp_expand(hrp: str) -> list[int]:
            return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp]

        def bech32_create_checksum(hrp: str, data: list[int]) -> list[int]:
            values = bech32_hrp_expand(hrp) + data
            polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1
            return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)]

        data = [witness_version] + convertbits(witness_program, 8, 5)
        checksum = bech32_create_checksum(hrp, data)
        return hrp + "1" + "".join(charset[d] for d in data + checksum)

    def _encode_base58check_address(self, payload: bytes, version: int) -> str:
        """Encode payload as base58check address."""
        import hashlib

        versioned = bytes([version]) + payload
        checksum = hashlib.sha256(hashlib.sha256(versioned).digest()).digest()[:4]
        data = versioned + checksum

        ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz"  # noqa: N806
        n = int.from_bytes(data, "big")
        result = ""
        while n > 0:
            n, r = divmod(n, 58)
            result = ALPHABET[r] + result

        # Add leading zeros
        for byte in data:
            if byte == 0:
                result = "1" + result
            else:
                break

        return result

    async def get_filter_header(self, block_height: int) -> str:
        """
        Get compact block filter header for given height.

        BIP157 filter headers form a chain for validation.
        """
        try:
            result = await self._api_call(
                "GET",
                f"v1/block/{block_height}/filter_header",
            )
            return result.get("filter_header", "")

        except Exception as e:
            logger.error(f"Failed to fetch filter header for height {block_height}: {e}")
            raise

    async def get_connected_peers(self) -> list[dict[str, Any]]:
        """Get list of connected P2P peers."""
        try:
            result = await self._api_call("GET", "v1/peers")
            return result.get("peers", [])

        except Exception as e:
            logger.warning(f"Failed to fetch peers: {e}")
            return []

    async def rescan_from_height(
        self,
        start_height: int,
        addresses: list[str] | None = None,
        outpoints: list[tuple[str, int]] | None = None,
    ) -> None:
        """
        Rescan blockchain from a specific height for addresses.

        This triggers neutrino to re-check compact block filters from
        the specified height for relevant transactions.

        Uses the neutrino-api v0.4 rescan endpoint:
        POST /v1/rescan with {"start_height": N, "addresses": [...]}

        Note: The v0.4 API only supports address-based rescans.
        Outpoints are tracked via address watches instead.

        Args:
            start_height: Block height to start rescan from
            addresses: List of addresses to scan for (required for v0.4)
            outpoints: List of (txid, vout) outpoints - not directly supported,
                      will be ignored (use add_watch_outpoint instead)

        Raises:
            ValueError: If start_height is invalid or rescan depth exceeds limits
        """
        if not addresses:
            logger.warning("Rescan called without addresses - nothing to scan")
            return

        # Security: Validate start_height to prevent rescan abuse
        if start_height < self._min_valid_blockheight:
            raise ValueError(
                f"start_height {start_height} is below minimum valid height "
                f"{self._min_valid_blockheight} for {self.network}"
            )

        tip_height = await self.get_block_height()
        if start_height > tip_height:
            raise ValueError(f"start_height {start_height} is in the future (tip: {tip_height})")

        rescan_depth = tip_height - start_height
        if rescan_depth > self._max_rescan_depth:
            raise ValueError(
                f"Rescan depth {rescan_depth} exceeds maximum {self._max_rescan_depth} blocks"
            )

        # Track addresses locally (with limit check)
        for addr in addresses:
            await self.add_watch_address(addr)

        # Note: v0.4 API doesn't support outpoints in rescan
        if outpoints:
            logger.debug(
                "Outpoints parameter ignored in v0.4 rescan API. "
                "Use address-based watching instead."
            )
            for txid, vout in outpoints:
                self._watched_outpoints.add((txid, vout))

        try:
            await self._api_call(
                "POST",
                "v1/rescan",
                data={
                    "start_height": start_height,
                    "addresses": addresses,
                },
            )
            logger.info(f"Started rescan from height {start_height} for {len(addresses)} addresses")

        except Exception as e:
            logger.error(f"Failed to start rescan: {e}")
            raise

    async def close(self) -> None:
        """Close the HTTP client connection."""
        await self.client.aclose()

Blockchain backend using Neutrino light client.

Neutrino is a privacy-preserving Bitcoin light client that uses BIP157/BIP158 compact block filters instead of traditional SPV.

Communication with the neutrino daemon is via REST API. The neutrino daemon should be running alongside this client.

Initialize Neutrino backend.

Args

neutrino_url
URL of the neutrino REST API (default port 8334)
network
Bitcoin network (mainnet, testnet, regtest, signet)
connect_peers
List of peer addresses to connect to (optional)
data_dir
Directory for neutrino data (headers, filters)

Ancestors

Methods

async def add_watch_address(self, address: str) ‑> None
Expand source code
async def add_watch_address(self, address: str) -> None:
    """
    Add an address to the local watch list.

    In neutrino-api v0.4, address watching is implicit - you just query
    UTXOs or do rescans with the addresses you care about. This method
    tracks addresses locally for convenience.

    Security: Limits the number of watched addresses to prevent memory
    exhaustion attacks.

    Args:
        address: Bitcoin address to watch

    Raises:
        ValueError: If watch list limit exceeded
    """
    if address in self._watched_addresses:
        return

    if len(self._watched_addresses) >= self._max_watched_addresses:
        logger.warning(
            f"Watch list limit reached ({self._max_watched_addresses}). "
            f"Cannot add address: {address[:20]}..."
        )
        raise ValueError(f"Watch list limit ({self._max_watched_addresses}) exceeded")

    self._watched_addresses.add(address)
    logger.trace(f"Watching address: {address}")

Add an address to the local watch list.

In neutrino-api v0.4, address watching is implicit - you just query UTXOs or do rescans with the addresses you care about. This method tracks addresses locally for convenience.

Security: Limits the number of watched addresses to prevent memory exhaustion attacks.

Args

address
Bitcoin address to watch

Raises

ValueError
If watch list limit exceeded
async def add_watch_outpoint(self, txid: str, vout: int) ‑> None
Expand source code
async def add_watch_outpoint(self, txid: str, vout: int) -> None:
    """
    Add an outpoint to the local watch list.

    In neutrino-api v0.4, outpoint watching is done via UTXO queries
    with the address parameter. This method tracks outpoints locally.

    Args:
        txid: Transaction ID
        vout: Output index
    """
    outpoint = (txid, vout)
    if outpoint in self._watched_outpoints:
        return

    self._watched_outpoints.add(outpoint)
    logger.debug(f"Watching outpoint: {txid}:{vout}")

Add an outpoint to the local watch list.

In neutrino-api v0.4, outpoint watching is done via UTXO queries with the address parameter. This method tracks outpoints locally.

Args

txid
Transaction ID
vout
Output index
async def broadcast_transaction(self, tx_hex: str) ‑> str
Expand source code
async def broadcast_transaction(self, tx_hex: str) -> str:
    """
    Broadcast transaction via neutrino to the P2P network.

    Neutrino maintains P2P connections and can broadcast transactions
    directly to connected peers.
    """
    try:
        result = await self._api_call(
            "POST",
            "v1/tx/broadcast",
            data={"tx_hex": tx_hex},
        )
        txid = result.get("txid", "")
        logger.info(f"Broadcast transaction: {txid}")
        return txid

    except Exception as e:
        logger.error(f"Failed to broadcast transaction: {e}")
        raise ValueError(f"Broadcast failed: {e}") from e

Broadcast transaction via neutrino to the P2P network.

Neutrino maintains P2P connections and can broadcast transactions directly to connected peers.

def can_estimate_fee(self) ‑> bool
Expand source code
def can_estimate_fee(self) -> bool:
    """Neutrino cannot reliably estimate fees - requires full node."""
    return False

Neutrino cannot reliably estimate fees - requires full node.

def can_provide_neutrino_metadata(self) ‑> bool
Expand source code
def can_provide_neutrino_metadata(self) -> bool:
    """
    Neutrino backend cannot reliably provide metadata for all UTXOs.

    Light clients can only provide metadata for UTXOs they've been watching.
    They cannot provide metadata for arbitrary addresses like full nodes can.

    Returns:
        False - Neutrino cannot provide metadata for arbitrary UTXOs
    """
    return False

Neutrino backend cannot reliably provide metadata for all UTXOs.

Light clients can only provide metadata for UTXOs they've been watching. They cannot provide metadata for arbitrary addresses like full nodes can.

Returns

False - Neutrino cannot provide metadata for arbitrary UTXOs

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close the HTTP client connection."""
    await self.client.aclose()

Close the HTTP client connection.

async def estimate_fee(self, target_blocks: int) ‑> float
Expand source code
async def estimate_fee(self, target_blocks: int) -> float:
    """
    Estimate fee in sat/vbyte for target confirmation blocks.

    Neutrino does not support fee estimation - returns conservative defaults.
    Use can_estimate_fee() to check if reliable estimation is available.
    """
    # Neutrino cannot estimate fees - return conservative defaults
    if target_blocks <= 1:
        return 5.0
    elif target_blocks <= 3:
        return 2.0
    elif target_blocks <= 6:
        return 1.0
    else:
        return 1.0

Estimate fee in sat/vbyte for target confirmation blocks.

Neutrino does not support fee estimation - returns conservative defaults. Use can_estimate_fee() to check if reliable estimation is available.

async def get_address_balance(self, address: str) ‑> int
Expand source code
async def get_address_balance(self, address: str) -> int:
    """Get balance for an address in satoshis."""
    utxos = await self.get_utxos([address])
    balance = sum(utxo.value for utxo in utxos)
    logger.debug(f"Balance for {address}: {balance} sats")
    return balance

Get balance for an address in satoshis.

async def get_block_hash(self, block_height: int) ‑> str
Expand source code
async def get_block_hash(self, block_height: int) -> str:
    """Get block hash for given height."""
    try:
        result = await self._api_call(
            "GET",
            f"v1/block/{block_height}/header",
        )
        block_hash = result.get("hash", "")
        logger.debug(f"Block hash for height {block_height}: {block_hash}")
        return block_hash

    except Exception as e:
        logger.error(f"Failed to fetch block hash for height {block_height}: {e}")
        raise

Get block hash for given height.

async def get_block_height(self) ‑> int
Expand source code
async def get_block_height(self) -> int:
    """Get current blockchain height from neutrino."""
    try:
        result = await self._api_call("GET", "v1/status")
        height = result.get("block_height", 0)
        logger.debug(f"Current block height: {height}")
        return height

    except Exception as e:
        logger.error(f"Failed to fetch block height: {e}")
        raise

Get current blockchain height from neutrino.

async def get_block_time(self, block_height: int) ‑> int
Expand source code
async def get_block_time(self, block_height: int) -> int:
    """Get block time (unix timestamp) for given height."""
    try:
        result = await self._api_call(
            "GET",
            f"v1/block/{block_height}/header",
        )
        timestamp = result.get("timestamp", 0)
        logger.debug(f"Block {block_height} timestamp: {timestamp}")
        return timestamp

    except Exception as e:
        logger.error(f"Failed to fetch block time for height {block_height}: {e}")
        raise

Get block time (unix timestamp) for given height.

async def get_connected_peers(self) ‑> list[dict[str, typing.Any]]
Expand source code
async def get_connected_peers(self) -> list[dict[str, Any]]:
    """Get list of connected P2P peers."""
    try:
        result = await self._api_call("GET", "v1/peers")
        return result.get("peers", [])

    except Exception as e:
        logger.warning(f"Failed to fetch peers: {e}")
        return []

Get list of connected P2P peers.

async def get_filter_header(self, block_height: int) ‑> str
Expand source code
async def get_filter_header(self, block_height: int) -> str:
    """
    Get compact block filter header for given height.

    BIP157 filter headers form a chain for validation.
    """
    try:
        result = await self._api_call(
            "GET",
            f"v1/block/{block_height}/filter_header",
        )
        return result.get("filter_header", "")

    except Exception as e:
        logger.error(f"Failed to fetch filter header for height {block_height}: {e}")
        raise

Get compact block filter header for given height.

BIP157 filter headers form a chain for validation.

async def get_transaction(self, txid: str) ‑> Transaction | None
Expand source code
async def get_transaction(self, txid: str) -> Transaction | None:
    """
    Get transaction by txid.

    Note: Neutrino uses compact block filters (BIP158) and can only fetch
    transactions for addresses it has rescanned. It cannot fetch arbitrary
    transactions by txid alone. This method always returns None.

    For verification after broadcast, rely on UTXO checks with known addresses
    and block heights instead.
    """
    # Neutrino doesn't support fetching arbitrary transactions by txid
    # It can only work with UTXOs for known addresses via compact filters
    return None

Get transaction by txid.

Note: Neutrino uses compact block filters (BIP158) and can only fetch transactions for addresses it has rescanned. It cannot fetch arbitrary transactions by txid alone. This method always returns None.

For verification after broadcast, rely on UTXO checks with known addresses and block heights instead.

async def get_utxo(self, txid: str, vout: int) ‑> UTXO | None
Expand source code
async def get_utxo(self, txid: str, vout: int) -> UTXO | None:
    """Get a specific UTXO from the blockchain.
    Returns None if the UTXO does not exist or has been spent."""
    # Neutrino uses compact block filters and cannot perform arbitrary
    # UTXO lookups without the address. The API endpoint v1/utxo/{txid}/{vout}
    # requires the 'address' parameter to scan filter matches.
    #
    # If we don't have the address, we can't look it up.
    # Callers should use verify_utxo_with_metadata() instead.
    return None

Get a specific UTXO from the blockchain. Returns None if the UTXO does not exist or has been spent.

async def get_utxos(self, addresses: list[str]) ‑> list[UTXO]
Expand source code
async def get_utxos(self, addresses: list[str]) -> list[UTXO]:
    """
    Get UTXOs for given addresses using neutrino's rescan capability.

    Neutrino will scan the blockchain using compact block filters
    to find transactions relevant to the watched addresses.

    On first call, triggers a full blockchain rescan from genesis to ensure
    all historical UTXOs are found (critical for wallets funded before neutrino started).

    After initial rescan, automatically rescans if new blocks have arrived
    to detect transactions that occurred after the last scan.
    """
    utxos: list[UTXO] = []

    # Add addresses to watch list
    for address in addresses:
        await self.add_watch_address(address)

    # Get current tip height to check if new blocks have arrived
    current_height = await self.get_block_height()

    # On first UTXO query, trigger a full blockchain rescan to find existing UTXOs
    # This is critical for wallets that were funded before neutrino was watching them
    logger.debug(
        f"get_utxos: _initial_rescan_done={self._initial_rescan_done}, "
        f"watched_addresses={len(self._watched_addresses)}, "
        f"last_rescan={self._last_rescan_height}, current={current_height}"
    )
    if not self._initial_rescan_done and self._watched_addresses:
        logger.info(
            f"Performing initial blockchain rescan for {len(self._watched_addresses)} "
            "watched addresses (this may take a moment)..."
        )
        try:
            # Trigger rescan from block 0 for all watched addresses
            await self._api_call(
                "POST",
                "v1/rescan",
                data={
                    "addresses": list(self._watched_addresses),
                    "start_height": 0,
                },
            )
            # Wait for rescan to complete (neutrino processes this asynchronously)
            # On regtest with ~3000 blocks, this typically takes 5-10 seconds
            await asyncio.sleep(10.0)
            self._initial_rescan_done = True
            self._last_rescan_height = current_height
            self._rescan_in_progress = False
            self._just_rescanned = True
            logger.info("Initial blockchain rescan completed")
        except Exception as e:
            logger.warning(f"Initial rescan failed (will retry on next sync): {e}")
            self._rescan_in_progress = False
    elif current_height > self._last_rescan_height and not self._rescan_in_progress:
        # New blocks have arrived since last rescan - need to scan them
        # This is critical for finding CoinJoin outputs that were just confirmed
        # We rescan ALL watched addresses, not just the ones in the current query,
        # because wallet sync happens mixdepth by mixdepth and we need to find
        # outputs to any of our addresses
        self._rescan_in_progress = True
        logger.info(
            f"New blocks detected ({self._last_rescan_height} -> {current_height}), "
            f"rescanning for {len(self._watched_addresses)} watched addresses..."
        )
        try:
            # Rescan from just before the last known height to catch edge cases
            start_height = max(0, self._last_rescan_height - 1)

            await self._api_call(
                "POST",
                "v1/rescan",
                data={
                    "addresses": list(self._watched_addresses),
                    "start_height": start_height,
                },
            )
            # Wait for rescan to complete
            # NOTE: The rescan is asynchronous - neutrino needs time to:
            # 1. Match block filters
            # 2. Download full blocks that match
            # 3. Extract and index UTXOs
            # We wait 7 seconds to allow time for indexing to complete
            await asyncio.sleep(7.0)

            self._last_rescan_height = current_height
            self._rescan_in_progress = False
            self._just_rescanned = True
            logger.info(
                f"Incremental rescan completed from block {start_height} to {current_height}"
            )
        except Exception as e:
            logger.warning(f"Incremental rescan failed: {e}")
            self._rescan_in_progress = False
    elif self._rescan_in_progress:
        # A rescan was just triggered by a previous get_utxos call in this batch
        # Wait a bit for it to complete, but don't wait the full 7 seconds
        logger.debug("Rescan in progress from previous query, waiting briefly...")
        await asyncio.sleep(1.0)
    else:
        # No new blocks, just wait for filter matching / async UTXO lookups
        await asyncio.sleep(0.5)

    try:
        # Request UTXO scan for addresses with retry logic
        # The neutrino API performs UTXO lookups asynchronously, so we may need
        # to retry if the initial query happens before async indexing completes.
        # We only retry if we just completed a rescan (indicated by _just_rescanned flag)
        # to avoid unnecessary delays when scanning addresses that have no UTXOs.
        max_retries = 5 if self._just_rescanned else 1
        result: dict[str, Any] = {"utxos": []}

        for retry in range(max_retries):
            result = await self._api_call(
                "POST",
                "v1/utxos",
                data={"addresses": addresses},
            )

            utxo_count = len(result.get("utxos", []))

            # If we found UTXOs or this is the last retry, proceed
            if utxo_count > 0 or retry == max_retries - 1:
                if retry > 0 and self._just_rescanned:
                    logger.debug(f"Found {utxo_count} UTXOs after {retry + 1} attempts")
                break

            # No UTXOs yet - wait with exponential backoff before retrying
            # This allows time for async UTXO indexing to complete
            wait_time = 1.5**retry  # 1.0s, 1.5s, 2.25s, 3.37s, 5.06s
            logger.debug(
                f"No UTXOs found on attempt {retry + 1}/{max_retries}, "
                f"waiting {wait_time:.2f}s for async indexing..."
            )
            await asyncio.sleep(wait_time)

        # Reset the flag after we've completed the UTXO query
        # (subsequent queries in this batch won't need full retry)
        if self._just_rescanned:
            self._just_rescanned = False

        tip_height = await self.get_block_height()

        for utxo_data in result.get("utxos", []):
            height = utxo_data.get("height", 0)
            confirmations = 0
            if height > 0:
                confirmations = tip_height - height + 1

            utxo = UTXO(
                txid=utxo_data["txid"],
                vout=utxo_data["vout"],
                value=utxo_data["value"],
                address=utxo_data.get("address", ""),
                confirmations=confirmations,
                scriptpubkey=utxo_data.get("scriptpubkey", ""),
                height=height if height > 0 else None,
            )
            utxos.append(utxo)

        logger.debug(f"Found {len(utxos)} UTXOs for {len(addresses)} addresses")

    except Exception as e:
        logger.error(f"Failed to fetch UTXOs: {e}")

    return utxos

Get UTXOs for given addresses using neutrino's rescan capability.

Neutrino will scan the blockchain using compact block filters to find transactions relevant to the watched addresses.

On first call, triggers a full blockchain rescan from genesis to ensure all historical UTXOs are found (critical for wallets funded before neutrino started).

After initial rescan, automatically rescans if new blocks have arrived to detect transactions that occurred after the last scan.

def has_mempool_access(self) ‑> bool
Expand source code
def has_mempool_access(self) -> bool:
    """Neutrino cannot access mempool - only sees confirmed transactions.

    BIP157/158 compact block filters only match confirmed blocks.
    Unconfirmed transactions in the mempool are not visible to Neutrino.

    This means verify_tx_output() will return False for valid transactions
    that are in the mempool but not yet confirmed. Takers using Neutrino
    must use alternative verification strategies (e.g., trust maker ACKs,
    multi-maker broadcast, wait for confirmation).
    """
    return False

Neutrino cannot access mempool - only sees confirmed transactions.

BIP157/158 compact block filters only match confirmed blocks. Unconfirmed transactions in the mempool are not visible to Neutrino.

This means verify_tx_output() will return False for valid transactions that are in the mempool but not yet confirmed. Takers using Neutrino must use alternative verification strategies (e.g., trust maker ACKs, multi-maker broadcast, wait for confirmation).

def requires_neutrino_metadata(self) ‑> bool
Expand source code
def requires_neutrino_metadata(self) -> bool:
    """
    Neutrino backend requires metadata for arbitrary UTXO verification.

    Without scriptPubKey and blockheight hints, Neutrino cannot verify
    UTXOs that it hasn't been watching from the start.

    Returns:
        True - Neutrino always requires metadata for counterparty UTXOs
    """
    return True

Neutrino backend requires metadata for arbitrary UTXO verification.

Without scriptPubKey and blockheight hints, Neutrino cannot verify UTXOs that it hasn't been watching from the start.

Returns

True - Neutrino always requires metadata for counterparty UTXOs

async def rescan_from_height(self,
start_height: int,
addresses: list[str] | None = None,
outpoints: list[tuple[str, int]] | None = None) ‑> None
Expand source code
async def rescan_from_height(
    self,
    start_height: int,
    addresses: list[str] | None = None,
    outpoints: list[tuple[str, int]] | None = None,
) -> None:
    """
    Rescan blockchain from a specific height for addresses.

    This triggers neutrino to re-check compact block filters from
    the specified height for relevant transactions.

    Uses the neutrino-api v0.4 rescan endpoint:
    POST /v1/rescan with {"start_height": N, "addresses": [...]}

    Note: The v0.4 API only supports address-based rescans.
    Outpoints are tracked via address watches instead.

    Args:
        start_height: Block height to start rescan from
        addresses: List of addresses to scan for (required for v0.4)
        outpoints: List of (txid, vout) outpoints - not directly supported,
                  will be ignored (use add_watch_outpoint instead)

    Raises:
        ValueError: If start_height is invalid or rescan depth exceeds limits
    """
    if not addresses:
        logger.warning("Rescan called without addresses - nothing to scan")
        return

    # Security: Validate start_height to prevent rescan abuse
    if start_height < self._min_valid_blockheight:
        raise ValueError(
            f"start_height {start_height} is below minimum valid height "
            f"{self._min_valid_blockheight} for {self.network}"
        )

    tip_height = await self.get_block_height()
    if start_height > tip_height:
        raise ValueError(f"start_height {start_height} is in the future (tip: {tip_height})")

    rescan_depth = tip_height - start_height
    if rescan_depth > self._max_rescan_depth:
        raise ValueError(
            f"Rescan depth {rescan_depth} exceeds maximum {self._max_rescan_depth} blocks"
        )

    # Track addresses locally (with limit check)
    for addr in addresses:
        await self.add_watch_address(addr)

    # Note: v0.4 API doesn't support outpoints in rescan
    if outpoints:
        logger.debug(
            "Outpoints parameter ignored in v0.4 rescan API. "
            "Use address-based watching instead."
        )
        for txid, vout in outpoints:
            self._watched_outpoints.add((txid, vout))

    try:
        await self._api_call(
            "POST",
            "v1/rescan",
            data={
                "start_height": start_height,
                "addresses": addresses,
            },
        )
        logger.info(f"Started rescan from height {start_height} for {len(addresses)} addresses")

    except Exception as e:
        logger.error(f"Failed to start rescan: {e}")
        raise

Rescan blockchain from a specific height for addresses.

This triggers neutrino to re-check compact block filters from the specified height for relevant transactions.

Uses the neutrino-api v0.4 rescan endpoint: POST /v1/rescan with {"start_height": N, "addresses": […]}

Note: The v0.4 API only supports address-based rescans. Outpoints are tracked via address watches instead.

Args

start_height
Block height to start rescan from
addresses
List of addresses to scan for (required for v0.4)
outpoints
List of (txid, vout) outpoints - not directly supported, will be ignored (use add_watch_outpoint instead)

Raises

ValueError
If start_height is invalid or rescan depth exceeds limits
async def verify_tx_output(self, txid: str, vout: int, address: str, start_height: int | None = None) ‑> bool
Expand source code
async def verify_tx_output(
    self,
    txid: str,
    vout: int,
    address: str,
    start_height: int | None = None,
) -> bool:
    """
    Verify that a specific transaction output exists using neutrino's UTXO endpoint.

    Uses GET /v1/utxo/{txid}/{vout}?address=...&start_height=... to check if
    the output exists. This works because neutrino uses compact block filters
    that can match on addresses.

    Args:
        txid: Transaction ID to verify
        vout: Output index to check
        address: The address that should own this output
        start_height: Block height hint for efficient scanning (recommended)

    Returns:
        True if the output exists, False otherwise
    """
    try:
        params: dict[str, str | int] = {"address": address}
        if start_height is not None:
            params["start_height"] = start_height

        result = await self._api_call(
            "GET",
            f"v1/utxo/{txid}/{vout}",
            params=params,
        )

        # If we got a response with unspent status, the output exists
        # Note: Even spent outputs confirm the transaction was broadcast
        if result is not None:
            logger.debug(
                f"Verified tx output {txid}:{vout} exists "
                f"(unspent={result.get('unspent', 'unknown')})"
            )
            return True

        return False

    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            # Output not found
            logger.debug(f"Tx output {txid}:{vout} not found")
            return False
        logger.warning(f"Error verifying tx output {txid}:{vout}: {e}")
        return False
    except Exception as e:
        logger.warning(f"Error verifying tx output {txid}:{vout}: {e}")
        return False

Verify that a specific transaction output exists using neutrino's UTXO endpoint.

Uses GET /v1/utxo/{txid}/{vout}?address=…&start_height=… to check if the output exists. This works because neutrino uses compact block filters that can match on addresses.

Args

txid
Transaction ID to verify
vout
Output index to check
address
The address that should own this output
start_height
Block height hint for efficient scanning (recommended)

Returns

True if the output exists, False otherwise

async def verify_utxo_with_metadata(self, txid: str, vout: int, scriptpubkey: str, blockheight: int) ‑> UTXOVerificationResult
Expand source code
async def verify_utxo_with_metadata(
    self,
    txid: str,
    vout: int,
    scriptpubkey: str,
    blockheight: int,
) -> UTXOVerificationResult:
    """
    Verify a UTXO using provided metadata (neutrino_compat feature).

    This is the key method that enables Neutrino light clients to verify
    counterparty UTXOs in CoinJoin without arbitrary blockchain queries.

    Uses the neutrino-api v0.4 UTXO check endpoint which requires:
    - address: The Bitcoin address that owns the UTXO (derived from scriptPubKey)
    - start_height: Block height to start scanning from (for efficiency)

    The API scans from start_height to chain tip using compact block filters
    to determine if the UTXO exists and whether it has been spent.

    Security: Validates blockheight to prevent rescan abuse attacks where
    malicious peers provide very low blockheights to trigger expensive rescans.

    Args:
        txid: Transaction ID
        vout: Output index
        scriptpubkey: Expected scriptPubKey (hex) - used to derive address
        blockheight: Block height where UTXO was confirmed - scan start hint

    Returns:
        UTXOVerificationResult with verification status and UTXO data
    """
    # Security: Validate blockheight to prevent rescan abuse
    tip_height = await self.get_block_height()

    if blockheight < self._min_valid_blockheight:
        return UTXOVerificationResult(
            valid=False,
            error=f"Blockheight {blockheight} is below minimum valid height "
            f"{self._min_valid_blockheight} for {self.network}",
        )

    if blockheight > tip_height:
        return UTXOVerificationResult(
            valid=False,
            error=f"Blockheight {blockheight} is in the future (tip: {tip_height})",
        )

    # Limit rescan depth to prevent DoS
    rescan_depth = tip_height - blockheight
    if rescan_depth > self._max_rescan_depth:
        return UTXOVerificationResult(
            valid=False,
            error=f"Rescan depth {rescan_depth} exceeds max {self._max_rescan_depth}. "
            f"UTXO too old for efficient verification.",
        )

    logger.debug(
        f"Verifying UTXO {txid}:{vout} with metadata "
        f"(scriptpubkey={scriptpubkey[:20]}..., blockheight={blockheight})"
    )

    # Step 1: Derive address from scriptPubKey
    # The neutrino-api v0.4 requires the address for UTXO lookup
    address = self._scriptpubkey_to_address(scriptpubkey)
    if not address:
        return UTXOVerificationResult(
            valid=False,
            error=f"Could not derive address from scriptPubKey: {scriptpubkey[:40]}...",
        )

    logger.debug(f"Derived address {address} from scriptPubKey")

    try:
        # Step 2: Query the specific UTXO using the v0.4 API
        # GET /v1/utxo/{txid}/{vout}?address=...&start_height=...
        #
        # The start_height parameter is critical for performance:
        # - Scanning 1 block takes ~0.01s
        # - Scanning 100 blocks takes ~0.5s
        # - Scanning 10,000+ blocks can take minutes
        #
        # We use blockheight - 1 as a safety margin in case of reorgs
        start_height = max(0, blockheight - 1)

        result = await self._api_call(
            "GET",
            f"v1/utxo/{txid}/{vout}",
            params={"address": address, "start_height": start_height},
        )

        # Check if UTXO is unspent
        if not result.get("unspent", False):
            spending_txid = result.get("spending_txid", "unknown")
            spending_height = result.get("spending_height", "unknown")
            return UTXOVerificationResult(
                valid=False,
                error=f"UTXO has been spent in tx {spending_txid} at height {spending_height}",
            )

        # Step 3: Verify scriptPubKey matches
        actual_scriptpubkey = result.get("scriptpubkey", "")
        scriptpubkey_matches = actual_scriptpubkey.lower() == scriptpubkey.lower()

        if not scriptpubkey_matches:
            return UTXOVerificationResult(
                valid=False,
                value=result.get("value", 0),
                error=f"ScriptPubKey mismatch: expected {scriptpubkey[:20]}..., "
                f"got {actual_scriptpubkey[:20]}...",
                scriptpubkey_matches=False,
            )

        # Step 4: Calculate confirmations
        tip_height = await self.get_block_height()
        # The blockheight parameter is the confirmation height hint from the peer
        confirmations = tip_height - blockheight + 1 if blockheight > 0 else 0

        logger.info(
            f"UTXO {txid}:{vout} verified: value={result.get('value', 0)}, "
            f"confirmations={confirmations}"
        )

        return UTXOVerificationResult(
            valid=True,
            value=result.get("value", 0),
            confirmations=confirmations,
            scriptpubkey_matches=True,
        )

    except httpx.HTTPStatusError as e:
        if e.response.status_code == 404:
            return UTXOVerificationResult(
                valid=False,
                error="UTXO not found - may not exist or address derivation failed",
            )
        return UTXOVerificationResult(
            valid=False,
            error=f"UTXO query failed: {e}",
        )
    except Exception as e:
        return UTXOVerificationResult(
            valid=False,
            error=f"Verification failed: {e}",
        )

Verify a UTXO using provided metadata (neutrino_compat feature).

This is the key method that enables Neutrino light clients to verify counterparty UTXOs in CoinJoin without arbitrary blockchain queries.

Uses the neutrino-api v0.4 UTXO check endpoint which requires: - address: The Bitcoin address that owns the UTXO (derived from scriptPubKey) - start_height: Block height to start scanning from (for efficiency)

The API scans from start_height to chain tip using compact block filters to determine if the UTXO exists and whether it has been spent.

Security: Validates blockheight to prevent rescan abuse attacks where malicious peers provide very low blockheights to trigger expensive rescans.

Args

txid
Transaction ID
vout
Output index
scriptpubkey
Expected scriptPubKey (hex) - used to derive address
blockheight
Block height where UTXO was confirmed - scan start hint

Returns

UTXOVerificationResult with verification status and UTXO data

async def wait_for_sync(self, timeout: float = 300.0) ‑> bool
Expand source code
async def wait_for_sync(self, timeout: float = 300.0) -> bool:
    """
    Wait for neutrino to sync block headers and filters.

    Args:
        timeout: Maximum time to wait in seconds

    Returns:
        True if synced, False if timeout
    """
    start_time = asyncio.get_event_loop().time()

    while True:
        try:
            status = await self._api_call("GET", "v1/status")
            synced = status.get("synced", False)
            block_height = status.get("block_height", 0)
            filter_height = status.get("filter_height", 0)

            if synced and block_height == filter_height:
                self._synced = True
                self._filter_header_tip = block_height
                logger.info(f"Neutrino synced at height {block_height}")
                return True

            logger.debug(f"Syncing... blocks: {block_height}, filters: {filter_height}")

        except Exception as e:
            logger.warning(f"Waiting for neutrino daemon: {e}")

        elapsed = asyncio.get_event_loop().time() - start_time
        if elapsed > timeout:
            logger.error("Neutrino sync timeout")
            return False

        await asyncio.sleep(2.0)

Wait for neutrino to sync block headers and filters.

Args

timeout
Maximum time to wait in seconds

Returns

True if synced, False if timeout

Inherited members

class NeutrinoConfig (network: str = 'mainnet',
data_dir: str = '/data/neutrino',
listen_port: int = 8334,
peers: list[str] | None = None,
tor_socks: str | None = None)
Expand source code
class NeutrinoConfig:
    """
    Configuration for running a neutrino daemon.

    This configuration can be used to start a neutrino process
    programmatically or generate a config file.
    """

    def __init__(
        self,
        network: str = "mainnet",
        data_dir: str = "/data/neutrino",
        listen_port: int = 8334,
        peers: list[str] | None = None,
        tor_socks: str | None = None,
    ):
        """
        Initialize neutrino configuration.

        Args:
            network: Bitcoin network (mainnet, testnet, regtest, signet)
            data_dir: Directory for neutrino data
            listen_port: Port for REST API
            peers: List of peer addresses to connect to
            tor_socks: Tor SOCKS5 proxy address (e.g., "127.0.0.1:9050")
        """
        self.network = network
        self.data_dir = data_dir
        self.listen_port = listen_port
        self.peers = peers or []
        self.tor_socks = tor_socks

    def get_chain_params(self) -> dict[str, Any]:
        """Get chain-specific parameters."""
        params = {
            "mainnet": {
                "default_port": 8333,
                "dns_seeds": [
                    "seed.bitcoin.sipa.be",
                    "dnsseed.bluematt.me",
                    "dnsseed.bitcoin.dashjr.org",
                    "seed.bitcoinstats.com",
                    "seed.bitcoin.jonasschnelli.ch",
                    "seed.btc.petertodd.net",
                ],
            },
            "testnet": {
                "default_port": 18333,
                "dns_seeds": [
                    "testnet-seed.bitcoin.jonasschnelli.ch",
                    "seed.tbtc.petertodd.net",
                    "testnet-seed.bluematt.me",
                ],
            },
            "signet": {
                "default_port": 38333,
                "dns_seeds": [
                    "seed.signet.bitcoin.sprovoost.nl",
                ],
            },
            "regtest": {
                "default_port": 18444,
                "dns_seeds": [],
            },
        }
        return params.get(self.network, params["mainnet"])

    def to_args(self) -> list[str]:
        """Generate command-line arguments for neutrino daemon."""
        args = [
            f"--datadir={self.data_dir}",
            f"--{self.network}",
            f"--restlisten=0.0.0.0:{self.listen_port}",
        ]

        if self.tor_socks:
            args.append(f"--proxy={self.tor_socks}")

        for peer in self.peers:
            args.append(f"--connect={peer}")

        return args

Configuration for running a neutrino daemon.

This configuration can be used to start a neutrino process programmatically or generate a config file.

Initialize neutrino configuration.

Args

network
Bitcoin network (mainnet, testnet, regtest, signet)
data_dir
Directory for neutrino data
listen_port
Port for REST API
peers
List of peer addresses to connect to
tor_socks
Tor SOCKS5 proxy address (e.g., "127.0.0.1:9050")

Methods

def get_chain_params(self) ‑> dict[str, typing.Any]
Expand source code
def get_chain_params(self) -> dict[str, Any]:
    """Get chain-specific parameters."""
    params = {
        "mainnet": {
            "default_port": 8333,
            "dns_seeds": [
                "seed.bitcoin.sipa.be",
                "dnsseed.bluematt.me",
                "dnsseed.bitcoin.dashjr.org",
                "seed.bitcoinstats.com",
                "seed.bitcoin.jonasschnelli.ch",
                "seed.btc.petertodd.net",
            ],
        },
        "testnet": {
            "default_port": 18333,
            "dns_seeds": [
                "testnet-seed.bitcoin.jonasschnelli.ch",
                "seed.tbtc.petertodd.net",
                "testnet-seed.bluematt.me",
            ],
        },
        "signet": {
            "default_port": 38333,
            "dns_seeds": [
                "seed.signet.bitcoin.sprovoost.nl",
            ],
        },
        "regtest": {
            "default_port": 18444,
            "dns_seeds": [],
        },
    }
    return params.get(self.network, params["mainnet"])

Get chain-specific parameters.

def to_args(self) ‑> list[str]
Expand source code
def to_args(self) -> list[str]:
    """Generate command-line arguments for neutrino daemon."""
    args = [
        f"--datadir={self.data_dir}",
        f"--{self.network}",
        f"--restlisten=0.0.0.0:{self.listen_port}",
    ]

    if self.tor_socks:
        args.append(f"--proxy={self.tor_socks}")

    for peer in self.peers:
        args.append(f"--connect={peer}")

    return args

Generate command-line arguments for neutrino daemon.

class Transaction (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class Transaction:
    txid: str
    raw: str
    confirmations: int
    block_height: int | None = None
    block_time: int | None = None

Instance variables

var block_height : int | None

The type of the None singleton.

var block_time : int | None

The type of the None singleton.

var confirmations : int

The type of the None singleton.

var raw : str

The type of the None singleton.

var txid : str

The type of the None singleton.

class UTXO (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class UTXO:
    txid: str
    vout: int
    value: int
    address: str
    confirmations: int
    scriptpubkey: str
    height: int | None = None

Instance variables

var address : str

The type of the None singleton.

var confirmations : int

The type of the None singleton.

var height : int | None

The type of the None singleton.

var scriptpubkey : str

The type of the None singleton.

var txid : str

The type of the None singleton.

var value : int

The type of the None singleton.

var vout : int

The type of the None singleton.

class UTXOVerificationResult (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class UTXOVerificationResult:
    """
    Result of UTXO verification with metadata.

    Used by neutrino_compat feature for Neutrino-compatible verification.
    """

    valid: bool
    value: int = 0
    confirmations: int = 0
    error: str | None = None
    scriptpubkey_matches: bool = False

Result of UTXO verification with metadata.

Used by neutrino_compat feature for Neutrino-compatible verification.

Instance variables

var confirmations : int

The type of the None singleton.

var error : str | None

The type of the None singleton.

var scriptpubkey_matches : bool

The type of the None singleton.

var valid : bool

The type of the None singleton.

var value : int

The type of the None singleton.