Module jmwallet.backends
Blockchain backend implementations.
Available backends: - BitcoinCoreBackend: Full node via Bitcoin Core RPC (no wallet, uses scantxoutset) - NeutrinoBackend: Lightweight BIP157/BIP158 SPV client - MempoolBackend: Mempool.space API (third-party, no setup required)
Neutrino Compatibility: All backends support verify_utxo_with_metadata() for Neutrino-compatible UTXO verification. Check backend.requires_neutrino_metadata() to determine if the backend needs scriptPubKey/blockheight hints from peers.
Sub-modules
jmwallet.backends.base-
Base blockchain backend interface.
jmwallet.backends.bitcoin_core-
Bitcoin Core RPC blockchain backend. Uses RPC calls but NOT wallet functionality (no BDB dependency).
jmwallet.backends.mempool-
Mempool.space API blockchain backend. Beginner-friendly backend that requires no setup.
jmwallet.backends.neutrino-
Neutrino (BIP157/BIP158) light client blockchain backend …
Classes
class BitcoinCoreBackend (rpc_url: str = 'http://127.0.0.1:18443',
rpc_user: str = 'rpcuser',
rpc_password: str = 'rpcpassword',
scan_timeout: float = 300.0)-
Expand source code
class BitcoinCoreBackend(BlockchainBackend): """ Blockchain backend using Bitcoin Core RPC. Does NOT use Bitcoin Core wallet (avoids BDB issues). Uses scantxoutset and other non-wallet RPC methods. """ def __init__( self, rpc_url: str = "http://127.0.0.1:18443", rpc_user: str = "rpcuser", rpc_password: str = "rpcpassword", scan_timeout: float = SCAN_RPC_TIMEOUT, ): self.rpc_url = rpc_url.rstrip("/") self.rpc_user = rpc_user self.rpc_password = rpc_password self.scan_timeout = scan_timeout # Client for regular RPC calls self.client = httpx.AsyncClient(timeout=DEFAULT_RPC_TIMEOUT, auth=(rpc_user, rpc_password)) # Separate client for long-running scans self._scan_client = httpx.AsyncClient(timeout=scan_timeout, auth=(rpc_user, rpc_password)) self._request_id = 0 async def _rpc_call( self, method: str, params: list | None = None, client: httpx.AsyncClient | None = None, ) -> Any: """ Make an RPC call to Bitcoin Core. Args: method: RPC method name params: Method parameters client: Optional httpx client (uses default client if not provided) Returns: RPC result Raises: ValueError: On RPC errors httpx.HTTPError: On connection/timeout errors """ self._request_id += 1 payload = { "jsonrpc": "2.0", "id": self._request_id, "method": method, "params": params or [], } use_client = client or self.client try: response = await use_client.post(self.rpc_url, json=payload) response.raise_for_status() data = response.json() if "error" in data and data["error"]: error_info = data["error"] error_code = error_info.get("code", "unknown") error_msg = error_info.get("message", str(error_info)) raise ValueError(f"RPC error {error_code}: {error_msg}") return data.get("result") except httpx.TimeoutException as e: logger.error(f"RPC call timed out: {method} - {e}") raise except httpx.HTTPError as e: logger.error(f"RPC call failed: {method} - {e}") raise async def _scantxoutset_with_retry( self, descriptors: Sequence[str | dict[str, Any]] ) -> dict[str, Any] | None: """ Execute scantxoutset with retry logic for handling concurrent scan conflicts. Bitcoin Core only allows one scantxoutset at a time. This method: 1. Checks if a scan is already in progress 2. If so, waits for it to complete (via status polling) before starting ours 3. Starts our scan with extended timeout for mainnet Args: descriptors: List of output descriptors to scan for. Can be: - Simple strings: "addr(bc1q...)" - Dicts with range: {"desc": "wpkh([fp/84'/0'/0'/0/*)", "range": [0, 999]} Returns: Scan result dict or None if all retries failed """ for attempt in range(SCAN_MAX_RETRIES): try: # First check if a scan is already running status = await self._rpc_call("scantxoutset", ["status"]) if status is not None: # A scan is in progress - wait for it # Bitcoin Core returns progress as 0-100, not 0-1 progress = status.get("progress", 0) / 100.0 logger.debug( f"Another scan in progress ({progress:.1%}), waiting... " f"(attempt {attempt + 1}/{SCAN_MAX_RETRIES})" ) if attempt < SCAN_MAX_RETRIES - 1: await asyncio.sleep(SCAN_STATUS_POLL_INTERVAL) continue # Start our scan with extended timeout logger.debug(f"Starting UTXO scan for {len(descriptors)} descriptor(s)...") if SENSITIVE_LOGGING: logger.debug(f"Descriptors for scan: {descriptors}") result = await self._rpc_call( "scantxoutset", ["start", descriptors], client=self._scan_client ) if result: unspent_count = len(result.get("unspents", [])) total_amount = result.get("total_amount", 0) logger.debug( f"Scan completed: found {unspent_count} UTXOs, total {total_amount:.8f} BTC" ) if SENSITIVE_LOGGING and unspent_count > 0: logger.debug(f"Scan result: {result}") return result except ValueError as e: error_str = str(e) # Check for "scan already in progress" error (code -8) if "code': -8" in error_str or "Scan already in progress" in error_str: if attempt < SCAN_MAX_RETRIES - 1: delay = SCAN_BASE_DELAY * (2**attempt) + random.uniform(0, 0.5) logger.debug( f"Scan in progress (RPC error), retrying in {delay:.2f}s " f"(attempt {attempt + 1}/{SCAN_MAX_RETRIES})" ) await asyncio.sleep(delay) continue else: logger.warning( f"Max retries ({SCAN_MAX_RETRIES}) exceeded waiting for scan slot" ) return None else: # Other RPC errors - log and re-raise logger.error(f"scantxoutset RPC error: {error_str}") raise except httpx.TimeoutException: # Timeout during scan - this is a real failure on mainnet logger.error( f"scantxoutset timed out after {self.scan_timeout}s. " "Try increasing scan_timeout for mainnet." ) return None except Exception as e: logger.error(f"Unexpected error during scantxoutset: {type(e).__name__}: {e}") raise logger.warning(f"scantxoutset failed after {SCAN_MAX_RETRIES} attempts") return None async def get_utxos(self, addresses: list[str]) -> list[UTXO]: utxos: list[UTXO] = [] if not addresses: return utxos # Get tip height once for confirmation calculation try: tip_height = await self.get_block_height() except Exception as e: logger.error(f"Failed to get block height for UTXO scan: {e}") return utxos # Process in batches to avoid huge RPC requests batch_size = 100 for i in range(0, len(addresses), batch_size): chunk = addresses[i : i + batch_size] descriptors = [f"addr({addr})" for addr in chunk] if SENSITIVE_LOGGING: logger.debug(f"Scanning addresses batch {i // batch_size + 1}: {chunk}") try: # Scan for all addresses in this chunk at once (with retry for conflicts) result = await self._scantxoutset_with_retry(descriptors) if not result or "unspents" not in result: continue for utxo_data in result["unspents"]: confirmations = 0 if utxo_data.get("height", 0) > 0: confirmations = tip_height - utxo_data["height"] + 1 # Extract address from descriptor "addr(ADDRESS)#checksum" or "addr(ADDRESS)" desc = utxo_data.get("desc", "") # Remove checksum if present if "#" in desc: desc = desc.split("#")[0] address = "" if desc.startswith("addr(") and desc.endswith(")"): address = desc[5:-1] else: # Only log warning if we really can't parse it (and it's not empty) if desc: logger.warning(f"Failed to parse address from descriptor: '{desc}'") utxo = UTXO( txid=utxo_data["txid"], vout=utxo_data["vout"], value=btc_to_sats(utxo_data["amount"]), address=address, confirmations=confirmations, scriptpubkey=utxo_data.get("scriptPubKey", ""), height=utxo_data.get("height"), ) utxos.append(utxo) logger.debug( f"Scanned {len(chunk)} addresses, found {len(result['unspents'])} UTXOs" ) except Exception as e: logger.warning(f"Failed to scan UTXOs for batch starting {chunk[0]}: {e}") continue return utxos async def scan_descriptors( self, descriptors: Sequence[str | dict[str, Any]] ) -> dict[str, Any] | None: """ Scan the UTXO set using output descriptors. This is much more efficient than scanning individual addresses, especially for HD wallets where you can use xpub descriptors with ranges to scan thousands of addresses in a single UTXO set pass. Example descriptors: - "addr(bc1q...)" - single address - "wpkh(xpub.../0/*)" - HD wallet external addresses (default range 0-1000) - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range Args: descriptors: List of output descriptors (strings or dicts with range) Returns: Raw scan result dict from Bitcoin Core, or None on failure. Result includes: - success: bool - txouts: number of UTXOs scanned - height: current block height - unspents: list of found UTXOs with txid, vout, scriptPubKey, desc (matched descriptor), amount, height - total_amount: sum of all found UTXOs """ if not descriptors: return {"success": True, "unspents": [], "total_amount": 0} logger.info(f"Starting descriptor scan with {len(descriptors)} descriptor(s)...") result = await self._scantxoutset_with_retry(descriptors) if result: unspent_count = len(result.get("unspents", [])) total = result.get("total_amount", 0) logger.info( f"Descriptor scan complete: found {unspent_count} UTXOs, total {total:.8f} BTC" ) else: logger.warning("Descriptor scan failed or returned no results") return result async def get_address_balance(self, address: str) -> int: utxos = await self.get_utxos([address]) balance = sum(utxo.value for utxo in utxos) logger.debug(f"Balance for {address}: {balance} sats") return balance async def broadcast_transaction(self, tx_hex: str) -> str: try: txid = await self._rpc_call("sendrawtransaction", [tx_hex]) logger.info(f"Broadcast transaction: {txid}") return txid except Exception as e: logger.error(f"Failed to broadcast transaction: {e}") raise ValueError(f"Broadcast failed: {e}") from e async def get_transaction(self, txid: str) -> Transaction | None: try: tx_data = await self._rpc_call("getrawtransaction", [txid, True]) if not tx_data: return None confirmations = tx_data.get("confirmations", 0) block_height = None block_time = None if "blockhash" in tx_data: block_info = await self._rpc_call("getblockheader", [tx_data["blockhash"]]) block_height = block_info.get("height") block_time = block_info.get("time") raw_hex = tx_data.get("hex", "") return Transaction( txid=txid, raw=raw_hex, confirmations=confirmations, block_height=block_height, block_time=block_time, ) except Exception as e: logger.warning(f"Failed to fetch transaction {txid}: {e}") return None async def estimate_fee(self, target_blocks: int) -> int: try: result = await self._rpc_call("estimatesmartfee", [target_blocks]) if "feerate" in result: btc_per_kb = result["feerate"] # Convert BTC/kB to sat/vB sat_per_vbyte = round(btc_to_sats(btc_per_kb) / 1000) logger.debug(f"Estimated fee for {target_blocks} blocks: {sat_per_vbyte} sat/vB") return sat_per_vbyte else: logger.warning("Fee estimation unavailable, using fallback") return 10 except Exception as e: logger.warning(f"Failed to estimate fee: {e}, using fallback") return 10 async def get_block_height(self) -> int: try: info = await self._rpc_call("getblockchaininfo", []) height = info.get("blocks", 0) logger.debug(f"Current block height: {height}") return height except Exception as e: logger.error(f"Failed to fetch block height: {e}") raise async def get_block_time(self, block_height: int) -> int: try: block_hash = await self.get_block_hash(block_height) block_header = await self._rpc_call("getblockheader", [block_hash]) timestamp = block_header.get("time", 0) logger.debug(f"Block {block_height} timestamp: {timestamp}") return timestamp except Exception as e: logger.error(f"Failed to fetch block time for height {block_height}: {e}") raise async def get_block_hash(self, block_height: int) -> str: try: block_hash = await self._rpc_call("getblockhash", [block_height]) logger.debug(f"Block hash for height {block_height}: {block_hash}") return block_hash except Exception as e: logger.error(f"Failed to fetch block hash for height {block_height}: {e}") raise async def get_utxo(self, txid: str, vout: int) -> UTXO | None: """Get a specific UTXO from the blockchain UTXO set using gettxout. Returns None if the UTXO does not exist or has been spent. If not found in confirmed UTXO set, checks mempool for unconfirmed transactions. """ try: # gettxout returns None if UTXO doesn't exist or is spent # include_mempool=True checks both confirmed and unconfirmed outputs result = await self._rpc_call("gettxout", [txid, vout, True]) if result is None: # Not found in UTXO set - check if it's in mempool (unconfirmed) logger.debug( f"UTXO {txid}:{vout} not found in confirmed UTXO set, checking mempool..." ) try: # Get raw transaction from mempool tx_data = await self._rpc_call("getrawtransaction", [txid, True]) if tx_data and "vout" in tx_data: # Check if the vout exists and hasn't been spent if vout < len(tx_data["vout"]): vout_data = tx_data["vout"][vout] value = btc_to_sats(vout_data.get("value", 0)) # Extract address from scriptPubKey script_pub_key = vout_data.get("scriptPubKey", {}) address = script_pub_key.get("address", "") # For multiple addresses (e.g., multisig), join them if not address and "addresses" in script_pub_key: addresses = script_pub_key.get("addresses", []) address = addresses[0] if addresses else "" scriptpubkey = script_pub_key.get("hex", "") # Unconfirmed transaction has 0 confirmations logger.info(f"Found UTXO {txid}:{vout} in mempool (unconfirmed)") return UTXO( txid=txid, vout=vout, value=value, address=address, confirmations=0, scriptpubkey=scriptpubkey, height=None, ) except Exception as mempool_err: logger.debug(f"UTXO {txid}:{vout} not in mempool either: {mempool_err}") logger.debug(f"UTXO {txid}:{vout} not found (spent or doesn't exist)") return None # Get tip height for confirmation calculation tip_height = await self.get_block_height() confirmations = result.get("confirmations", 0) value = btc_to_sats(result.get("value", 0)) # BTC to sats # Extract address from scriptPubKey script_pub_key = result.get("scriptPubKey", {}) address = script_pub_key.get("address", "") scriptpubkey = script_pub_key.get("hex", "") # Calculate height from confirmations height = None if confirmations > 0: height = tip_height - confirmations + 1 return UTXO( txid=txid, vout=vout, value=value, address=address, confirmations=confirmations, scriptpubkey=scriptpubkey, height=height, ) except Exception as e: logger.error(f"Failed to get UTXO {txid}:{vout}: {e}") return None def can_provide_neutrino_metadata(self) -> bool: """ Bitcoin Core can provide Neutrino-compatible metadata. Full node can access scriptpubkey and blockheight for all UTXOs, allowing Neutrino takers to use our makers. Returns: True - Bitcoin Core always provides extended UTXO metadata """ return True async def close(self) -> None: await self.client.aclose() await self._scan_client.aclose()Blockchain backend using Bitcoin Core RPC. Does NOT use Bitcoin Core wallet (avoids BDB issues). Uses scantxoutset and other non-wallet RPC methods.
Ancestors
- BlockchainBackend
- abc.ABC
Methods
def can_provide_neutrino_metadata(self) ‑> bool-
Expand source code
def can_provide_neutrino_metadata(self) -> bool: """ Bitcoin Core can provide Neutrino-compatible metadata. Full node can access scriptpubkey and blockheight for all UTXOs, allowing Neutrino takers to use our makers. Returns: True - Bitcoin Core always provides extended UTXO metadata """ return TrueBitcoin Core can provide Neutrino-compatible metadata.
Full node can access scriptpubkey and blockheight for all UTXOs, allowing Neutrino takers to use our makers.
Returns
True - Bitcoin Core always provides extended UTXO metadata
async def get_utxo(self, txid: str, vout: int) ‑> UTXO | None-
Expand source code
async def get_utxo(self, txid: str, vout: int) -> UTXO | None: """Get a specific UTXO from the blockchain UTXO set using gettxout. Returns None if the UTXO does not exist or has been spent. If not found in confirmed UTXO set, checks mempool for unconfirmed transactions. """ try: # gettxout returns None if UTXO doesn't exist or is spent # include_mempool=True checks both confirmed and unconfirmed outputs result = await self._rpc_call("gettxout", [txid, vout, True]) if result is None: # Not found in UTXO set - check if it's in mempool (unconfirmed) logger.debug( f"UTXO {txid}:{vout} not found in confirmed UTXO set, checking mempool..." ) try: # Get raw transaction from mempool tx_data = await self._rpc_call("getrawtransaction", [txid, True]) if tx_data and "vout" in tx_data: # Check if the vout exists and hasn't been spent if vout < len(tx_data["vout"]): vout_data = tx_data["vout"][vout] value = btc_to_sats(vout_data.get("value", 0)) # Extract address from scriptPubKey script_pub_key = vout_data.get("scriptPubKey", {}) address = script_pub_key.get("address", "") # For multiple addresses (e.g., multisig), join them if not address and "addresses" in script_pub_key: addresses = script_pub_key.get("addresses", []) address = addresses[0] if addresses else "" scriptpubkey = script_pub_key.get("hex", "") # Unconfirmed transaction has 0 confirmations logger.info(f"Found UTXO {txid}:{vout} in mempool (unconfirmed)") return UTXO( txid=txid, vout=vout, value=value, address=address, confirmations=0, scriptpubkey=scriptpubkey, height=None, ) except Exception as mempool_err: logger.debug(f"UTXO {txid}:{vout} not in mempool either: {mempool_err}") logger.debug(f"UTXO {txid}:{vout} not found (spent or doesn't exist)") return None # Get tip height for confirmation calculation tip_height = await self.get_block_height() confirmations = result.get("confirmations", 0) value = btc_to_sats(result.get("value", 0)) # BTC to sats # Extract address from scriptPubKey script_pub_key = result.get("scriptPubKey", {}) address = script_pub_key.get("address", "") scriptpubkey = script_pub_key.get("hex", "") # Calculate height from confirmations height = None if confirmations > 0: height = tip_height - confirmations + 1 return UTXO( txid=txid, vout=vout, value=value, address=address, confirmations=confirmations, scriptpubkey=scriptpubkey, height=height, ) except Exception as e: logger.error(f"Failed to get UTXO {txid}:{vout}: {e}") return NoneGet a specific UTXO from the blockchain UTXO set using gettxout. Returns None if the UTXO does not exist or has been spent.
If not found in confirmed UTXO set, checks mempool for unconfirmed transactions.
async def scan_descriptors(self, descriptors: Sequence[str | dict[str, Any]]) ‑> dict[str, typing.Any] | None-
Expand source code
async def scan_descriptors( self, descriptors: Sequence[str | dict[str, Any]] ) -> dict[str, Any] | None: """ Scan the UTXO set using output descriptors. This is much more efficient than scanning individual addresses, especially for HD wallets where you can use xpub descriptors with ranges to scan thousands of addresses in a single UTXO set pass. Example descriptors: - "addr(bc1q...)" - single address - "wpkh(xpub.../0/*)" - HD wallet external addresses (default range 0-1000) - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range Args: descriptors: List of output descriptors (strings or dicts with range) Returns: Raw scan result dict from Bitcoin Core, or None on failure. Result includes: - success: bool - txouts: number of UTXOs scanned - height: current block height - unspents: list of found UTXOs with txid, vout, scriptPubKey, desc (matched descriptor), amount, height - total_amount: sum of all found UTXOs """ if not descriptors: return {"success": True, "unspents": [], "total_amount": 0} logger.info(f"Starting descriptor scan with {len(descriptors)} descriptor(s)...") result = await self._scantxoutset_with_retry(descriptors) if result: unspent_count = len(result.get("unspents", [])) total = result.get("total_amount", 0) logger.info( f"Descriptor scan complete: found {unspent_count} UTXOs, total {total:.8f} BTC" ) else: logger.warning("Descriptor scan failed or returned no results") return resultScan the UTXO set using output descriptors.
This is much more efficient than scanning individual addresses, especially for HD wallets where you can use xpub descriptors with ranges to scan thousands of addresses in a single UTXO set pass.
Example descriptors: - "addr(bc1q…)" - single address - "wpkh(xpub…/0/)" - HD wallet external addresses (default range 0-1000) - {"desc": "wpkh(xpub…/0/)", "range": [0, 999]} - explicit range
Args
descriptors- List of output descriptors (strings or dicts with range)
Returns
Raw scan result dict from Bitcoin Core, or None on failure. Result includes: - success: bool - txouts: number of UTXOs scanned - height: current block height - unspents: list of found UTXOs with txid, vout, scriptPubKey, desc (matched descriptor), amount, height - total_amount: sum of all found UTXOs
Inherited members
class BlockchainBackend-
Expand source code
class BlockchainBackend(ABC): """ Abstract blockchain backend interface. Implementations provide access to blockchain data without requiring Bitcoin Core wallet functionality (avoiding BerkeleyDB issues). """ @abstractmethod async def get_utxos(self, addresses: list[str]) -> list[UTXO]: """Get UTXOs for given addresses""" @abstractmethod async def get_address_balance(self, address: str) -> int: """Get balance for an address in satoshis""" @abstractmethod async def broadcast_transaction(self, tx_hex: str) -> str: """Broadcast transaction, returns txid""" @abstractmethod async def get_transaction(self, txid: str) -> Transaction | None: """Get transaction by txid""" @abstractmethod async def estimate_fee(self, target_blocks: int) -> int: """Estimate fee in sat/vbyte for target confirmation blocks""" @abstractmethod async def get_block_height(self) -> int: """Get current blockchain height""" @abstractmethod async def get_block_time(self, block_height: int) -> int: """Get block time (unix timestamp) for given height""" @abstractmethod async def get_block_hash(self, block_height: int) -> str: """Get block hash for given height""" @abstractmethod async def get_utxo(self, txid: str, vout: int) -> UTXO | None: """Get a specific UTXO from the blockchain UTXO set (gettxout). Returns None if the UTXO does not exist or has been spent.""" async def scan_descriptors( self, descriptors: Sequence[str | dict[str, Any]] ) -> dict[str, Any] | None: """ Scan the UTXO set using output descriptors. This is an efficient alternative to scanning individual addresses, especially useful for HD wallets where xpub descriptors with ranges can scan thousands of addresses in a single UTXO set pass. Example descriptors: - "addr(bc1q...)" - single address - "wpkh(xpub.../0/*)" - HD wallet addresses (default range 0-1000) - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range Args: descriptors: List of output descriptors (strings or dicts with range) Returns: Scan result dict with: - success: bool - unspents: list of found UTXOs - total_amount: sum of all found UTXOs Returns None if not supported or on failure. Note: Not all backends support descriptor scanning. The default implementation returns None. Override in backends that support it (e.g., Bitcoin Core). """ # Default: not supported return None async def verify_utxo_with_metadata( self, txid: str, vout: int, scriptpubkey: str, blockheight: int, ) -> UTXOVerificationResult: """ Verify a UTXO using provided metadata (neutrino_compat feature). This method allows light clients to verify UTXOs without needing arbitrary blockchain queries by using metadata provided by the peer. The implementation should: 1. Use scriptpubkey to add the UTXO to watch list (for Neutrino) 2. Use blockheight as a hint for efficient rescan 3. Verify the UTXO exists with matching scriptpubkey 4. Return the UTXO value and confirmations Default implementation falls back to get_utxo() for full node backends. Args: txid: Transaction ID vout: Output index scriptpubkey: Expected scriptPubKey (hex) blockheight: Block height where UTXO was confirmed Returns: UTXOVerificationResult with verification status and UTXO data """ # Default implementation for full node backends # Just uses get_utxo() directly since we can query any UTXO utxo = await self.get_utxo(txid, vout) if utxo is None: return UTXOVerificationResult( valid=False, error="UTXO not found or spent", ) # Verify scriptpubkey matches scriptpubkey_matches = utxo.scriptpubkey.lower() == scriptpubkey.lower() if not scriptpubkey_matches: return UTXOVerificationResult( valid=False, value=utxo.value, confirmations=utxo.confirmations, error="ScriptPubKey mismatch", scriptpubkey_matches=False, ) return UTXOVerificationResult( valid=True, value=utxo.value, confirmations=utxo.confirmations, scriptpubkey_matches=True, ) def requires_neutrino_metadata(self) -> bool: """ Check if this backend requires Neutrino-compatible metadata for UTXO verification. Full node backends can verify any UTXO directly. Light client backends need scriptpubkey and blockheight hints. Returns: True if backend requires metadata for verification """ return False def can_provide_neutrino_metadata(self) -> bool: """ Check if this backend can provide Neutrino-compatible metadata to peers. This determines whether to advertise neutrino_compat feature to the network. Backends should return True if they can provide extended UTXO format with scriptpubkey and blockheight fields. Full node backends (Bitcoin Core) can provide this metadata. Light client backends (Neutrino) typically cannot reliably provide it for all UTXOs. Returns: True if backend can provide scriptpubkey and blockheight for its UTXOs """ # Default: Full nodes can provide metadata, light clients cannot return not self.requires_neutrino_metadata() async def verify_tx_output( self, txid: str, vout: int, address: str, start_height: int | None = None, ) -> bool: """ Verify that a specific transaction output exists (was broadcast and confirmed). This is useful for verifying a transaction was successfully broadcast when we know at least one of its output addresses (e.g., our coinjoin destination). For full node backends, this uses get_transaction(). For light clients (neutrino), this uses UTXO lookup with the address hint. Args: txid: Transaction ID to verify vout: Output index to check address: The address that should own this output start_height: Optional block height hint for light clients (improves performance) Returns: True if the output exists (transaction was broadcast), False otherwise """ # Default implementation for full node backends tx = await self.get_transaction(txid) return tx is not None async def close(self) -> None: """Close backend connection""" passAbstract blockchain backend interface. Implementations provide access to blockchain data without requiring Bitcoin Core wallet functionality (avoiding BerkeleyDB issues).
Ancestors
- abc.ABC
Subclasses
Methods
async def broadcast_transaction(self, tx_hex: str) ‑> str-
Expand source code
@abstractmethod async def broadcast_transaction(self, tx_hex: str) -> str: """Broadcast transaction, returns txid"""Broadcast transaction, returns txid
def can_provide_neutrino_metadata(self) ‑> bool-
Expand source code
def can_provide_neutrino_metadata(self) -> bool: """ Check if this backend can provide Neutrino-compatible metadata to peers. This determines whether to advertise neutrino_compat feature to the network. Backends should return True if they can provide extended UTXO format with scriptpubkey and blockheight fields. Full node backends (Bitcoin Core) can provide this metadata. Light client backends (Neutrino) typically cannot reliably provide it for all UTXOs. Returns: True if backend can provide scriptpubkey and blockheight for its UTXOs """ # Default: Full nodes can provide metadata, light clients cannot return not self.requires_neutrino_metadata()Check if this backend can provide Neutrino-compatible metadata to peers.
This determines whether to advertise neutrino_compat feature to the network. Backends should return True if they can provide extended UTXO format with scriptpubkey and blockheight fields.
Full node backends (Bitcoin Core) can provide this metadata. Light client backends (Neutrino) typically cannot reliably provide it for all UTXOs.
Returns
True if backend can provide scriptpubkey and blockheight for its UTXOs
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close backend connection""" passClose backend connection
async def estimate_fee(self, target_blocks: int) ‑> int-
Expand source code
@abstractmethod async def estimate_fee(self, target_blocks: int) -> int: """Estimate fee in sat/vbyte for target confirmation blocks"""Estimate fee in sat/vbyte for target confirmation blocks
async def get_address_balance(self, address: str) ‑> int-
Expand source code
@abstractmethod async def get_address_balance(self, address: str) -> int: """Get balance for an address in satoshis"""Get balance for an address in satoshis
async def get_block_hash(self, block_height: int) ‑> str-
Expand source code
@abstractmethod async def get_block_hash(self, block_height: int) -> str: """Get block hash for given height"""Get block hash for given height
async def get_block_height(self) ‑> int-
Expand source code
@abstractmethod async def get_block_height(self) -> int: """Get current blockchain height"""Get current blockchain height
async def get_block_time(self, block_height: int) ‑> int-
Expand source code
@abstractmethod async def get_block_time(self, block_height: int) -> int: """Get block time (unix timestamp) for given height"""Get block time (unix timestamp) for given height
async def get_transaction(self, txid: str) ‑> Transaction | None-
Expand source code
@abstractmethod async def get_transaction(self, txid: str) -> Transaction | None: """Get transaction by txid"""Get transaction by txid
async def get_utxo(self, txid: str, vout: int) ‑> UTXO | None-
Expand source code
@abstractmethod async def get_utxo(self, txid: str, vout: int) -> UTXO | None: """Get a specific UTXO from the blockchain UTXO set (gettxout). Returns None if the UTXO does not exist or has been spent."""Get a specific UTXO from the blockchain UTXO set (gettxout). Returns None if the UTXO does not exist or has been spent.
async def get_utxos(self, addresses: list[str]) ‑> list[UTXO]-
Expand source code
@abstractmethod async def get_utxos(self, addresses: list[str]) -> list[UTXO]: """Get UTXOs for given addresses"""Get UTXOs for given addresses
def requires_neutrino_metadata(self) ‑> bool-
Expand source code
def requires_neutrino_metadata(self) -> bool: """ Check if this backend requires Neutrino-compatible metadata for UTXO verification. Full node backends can verify any UTXO directly. Light client backends need scriptpubkey and blockheight hints. Returns: True if backend requires metadata for verification """ return FalseCheck if this backend requires Neutrino-compatible metadata for UTXO verification.
Full node backends can verify any UTXO directly. Light client backends need scriptpubkey and blockheight hints.
Returns
True if backend requires metadata for verification
async def scan_descriptors(self, descriptors: Sequence[str | dict[str, Any]]) ‑> dict[str, typing.Any] | None-
Expand source code
async def scan_descriptors( self, descriptors: Sequence[str | dict[str, Any]] ) -> dict[str, Any] | None: """ Scan the UTXO set using output descriptors. This is an efficient alternative to scanning individual addresses, especially useful for HD wallets where xpub descriptors with ranges can scan thousands of addresses in a single UTXO set pass. Example descriptors: - "addr(bc1q...)" - single address - "wpkh(xpub.../0/*)" - HD wallet addresses (default range 0-1000) - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range Args: descriptors: List of output descriptors (strings or dicts with range) Returns: Scan result dict with: - success: bool - unspents: list of found UTXOs - total_amount: sum of all found UTXOs Returns None if not supported or on failure. Note: Not all backends support descriptor scanning. The default implementation returns None. Override in backends that support it (e.g., Bitcoin Core). """ # Default: not supported return NoneScan the UTXO set using output descriptors.
This is an efficient alternative to scanning individual addresses, especially useful for HD wallets where xpub descriptors with ranges can scan thousands of addresses in a single UTXO set pass.
Example descriptors: - "addr(bc1q…)" - single address - "wpkh(xpub…/0/)" - HD wallet addresses (default range 0-1000) - {"desc": "wpkh(xpub…/0/)", "range": [0, 999]} - explicit range
Args
descriptors- List of output descriptors (strings or dicts with range)
Returns
Scan result dict with: - success: bool - unspents: list of found UTXOs - total_amount: sum of all found UTXOs Returns None if not supported or on failure.
Note
Not all backends support descriptor scanning. The default implementation returns None. Override in backends that support it (e.g., Bitcoin Core).
async def verify_tx_output(self, txid: str, vout: int, address: str, start_height: int | None = None) ‑> bool-
Expand source code
async def verify_tx_output( self, txid: str, vout: int, address: str, start_height: int | None = None, ) -> bool: """ Verify that a specific transaction output exists (was broadcast and confirmed). This is useful for verifying a transaction was successfully broadcast when we know at least one of its output addresses (e.g., our coinjoin destination). For full node backends, this uses get_transaction(). For light clients (neutrino), this uses UTXO lookup with the address hint. Args: txid: Transaction ID to verify vout: Output index to check address: The address that should own this output start_height: Optional block height hint for light clients (improves performance) Returns: True if the output exists (transaction was broadcast), False otherwise """ # Default implementation for full node backends tx = await self.get_transaction(txid) return tx is not NoneVerify that a specific transaction output exists (was broadcast and confirmed).
This is useful for verifying a transaction was successfully broadcast when we know at least one of its output addresses (e.g., our coinjoin destination).
For full node backends, this uses get_transaction(). For light clients (neutrino), this uses UTXO lookup with the address hint.
Args
txid- Transaction ID to verify
vout- Output index to check
address- The address that should own this output
start_height- Optional block height hint for light clients (improves performance)
Returns
True if the output exists (transaction was broadcast), False otherwise
async def verify_utxo_with_metadata(self, txid: str, vout: int, scriptpubkey: str, blockheight: int) ‑> UTXOVerificationResult-
Expand source code
async def verify_utxo_with_metadata( self, txid: str, vout: int, scriptpubkey: str, blockheight: int, ) -> UTXOVerificationResult: """ Verify a UTXO using provided metadata (neutrino_compat feature). This method allows light clients to verify UTXOs without needing arbitrary blockchain queries by using metadata provided by the peer. The implementation should: 1. Use scriptpubkey to add the UTXO to watch list (for Neutrino) 2. Use blockheight as a hint for efficient rescan 3. Verify the UTXO exists with matching scriptpubkey 4. Return the UTXO value and confirmations Default implementation falls back to get_utxo() for full node backends. Args: txid: Transaction ID vout: Output index scriptpubkey: Expected scriptPubKey (hex) blockheight: Block height where UTXO was confirmed Returns: UTXOVerificationResult with verification status and UTXO data """ # Default implementation for full node backends # Just uses get_utxo() directly since we can query any UTXO utxo = await self.get_utxo(txid, vout) if utxo is None: return UTXOVerificationResult( valid=False, error="UTXO not found or spent", ) # Verify scriptpubkey matches scriptpubkey_matches = utxo.scriptpubkey.lower() == scriptpubkey.lower() if not scriptpubkey_matches: return UTXOVerificationResult( valid=False, value=utxo.value, confirmations=utxo.confirmations, error="ScriptPubKey mismatch", scriptpubkey_matches=False, ) return UTXOVerificationResult( valid=True, value=utxo.value, confirmations=utxo.confirmations, scriptpubkey_matches=True, )Verify a UTXO using provided metadata (neutrino_compat feature).
This method allows light clients to verify UTXOs without needing arbitrary blockchain queries by using metadata provided by the peer.
The implementation should: 1. Use scriptpubkey to add the UTXO to watch list (for Neutrino) 2. Use blockheight as a hint for efficient rescan 3. Verify the UTXO exists with matching scriptpubkey 4. Return the UTXO value and confirmations
Default implementation falls back to get_utxo() for full node backends.
Args
txid- Transaction ID
vout- Output index
scriptpubkey- Expected scriptPubKey (hex)
blockheight- Block height where UTXO was confirmed
Returns
UTXOVerificationResult with verification status and UTXO data
class NeutrinoBackend (neutrino_url: str = 'http://127.0.0.1:8334',
network: str = 'mainnet',
connect_peers: list[str] | None = None,
data_dir: str = '/data/neutrino')-
Expand source code
class NeutrinoBackend(BlockchainBackend): """ Blockchain backend using Neutrino light client. Neutrino is a privacy-preserving Bitcoin light client that uses BIP157/BIP158 compact block filters instead of traditional SPV. Communication with the neutrino daemon is via REST API. The neutrino daemon should be running alongside this client. """ def __init__( self, neutrino_url: str = "http://127.0.0.1:8334", network: str = "mainnet", connect_peers: list[str] | None = None, data_dir: str = "/data/neutrino", ): """ Initialize Neutrino backend. Args: neutrino_url: URL of the neutrino REST API (default port 8334) network: Bitcoin network (mainnet, testnet, regtest, signet) connect_peers: List of peer addresses to connect to (optional) data_dir: Directory for neutrino data (headers, filters) """ self.neutrino_url = neutrino_url.rstrip("/") self.network = network self.connect_peers = connect_peers or [] self.data_dir = data_dir self.client = httpx.AsyncClient(timeout=60.0) # Cache for watched addresses (neutrino needs to know what to scan for) self._watched_addresses: set[str] = set() self._watched_outpoints: set[tuple[str, int]] = set() # Security limits to prevent DoS via excessive watch list / rescan abuse self._max_watched_addresses: int = 10000 # Maximum addresses to track self._max_rescan_depth: int = 100000 # Maximum blocks to rescan (roughly 2 years) self._min_valid_blockheight: int = 481824 # SegWit activation (mainnet) # For testnet/regtest, this will be adjusted based on network # Block filter cache self._filter_header_tip: int = 0 self._synced: bool = False # Track if we've done the initial rescan self._initial_rescan_done: bool = False # Adjust minimum blockheight based on network if network == "regtest": self._min_valid_blockheight = 0 # Regtest can have any height elif network == "testnet": self._min_valid_blockheight = 834624 # Approximate SegWit on testnet elif network == "signet": self._min_valid_blockheight = 0 # Signet started with SegWit async def _api_call( self, method: str, endpoint: str, params: dict[str, Any] | None = None, data: dict[str, Any] | None = None, ) -> Any: """Make an API call to the neutrino daemon.""" url = f"{self.neutrino_url}/{endpoint}" try: if method == "GET": response = await self.client.get(url, params=params) elif method == "POST": response = await self.client.post(url, json=data) else: raise ValueError(f"Unsupported HTTP method: {method}") response.raise_for_status() return response.json() except httpx.HTTPError as e: logger.error(f"Neutrino API call failed: {endpoint} - {e}") raise async def wait_for_sync(self, timeout: float = 300.0) -> bool: """ Wait for neutrino to sync block headers and filters. Args: timeout: Maximum time to wait in seconds Returns: True if synced, False if timeout """ start_time = asyncio.get_event_loop().time() while True: try: status = await self._api_call("GET", "v1/status") synced = status.get("synced", False) block_height = status.get("block_height", 0) filter_height = status.get("filter_height", 0) if synced and block_height == filter_height: self._synced = True self._filter_header_tip = block_height logger.info(f"Neutrino synced at height {block_height}") return True logger.debug(f"Syncing... blocks: {block_height}, filters: {filter_height}") except Exception as e: logger.warning(f"Waiting for neutrino daemon: {e}") elapsed = asyncio.get_event_loop().time() - start_time if elapsed > timeout: logger.error("Neutrino sync timeout") return False await asyncio.sleep(2.0) async def add_watch_address(self, address: str) -> None: """ Add an address to the local watch list. In neutrino-api v0.4, address watching is implicit - you just query UTXOs or do rescans with the addresses you care about. This method tracks addresses locally for convenience. Security: Limits the number of watched addresses to prevent memory exhaustion attacks. Args: address: Bitcoin address to watch Raises: ValueError: If watch list limit exceeded """ if address in self._watched_addresses: return if len(self._watched_addresses) >= self._max_watched_addresses: logger.warning( f"Watch list limit reached ({self._max_watched_addresses}). " f"Cannot add address: {address[:20]}..." ) raise ValueError(f"Watch list limit ({self._max_watched_addresses}) exceeded") self._watched_addresses.add(address) logger.debug(f"Watching address: {address}") async def add_watch_outpoint(self, txid: str, vout: int) -> None: """ Add an outpoint to the local watch list. In neutrino-api v0.4, outpoint watching is done via UTXO queries with the address parameter. This method tracks outpoints locally. Args: txid: Transaction ID vout: Output index """ outpoint = (txid, vout) if outpoint in self._watched_outpoints: return self._watched_outpoints.add(outpoint) logger.debug(f"Watching outpoint: {txid}:{vout}") async def get_utxos(self, addresses: list[str]) -> list[UTXO]: """ Get UTXOs for given addresses using neutrino's rescan capability. Neutrino will scan the blockchain using compact block filters to find transactions relevant to the watched addresses. On first call, triggers a full blockchain rescan from genesis to ensure all historical UTXOs are found (critical for wallets funded before neutrino started). """ utxos: list[UTXO] = [] # Add addresses to watch list for address in addresses: await self.add_watch_address(address) # On first UTXO query, trigger a full blockchain rescan to find existing UTXOs # This is critical for wallets that were funded before neutrino was watching them logger.debug( f"get_utxos: _initial_rescan_done={self._initial_rescan_done}, " f"watched_addresses={len(self._watched_addresses)}" ) if not self._initial_rescan_done and self._watched_addresses: logger.info( f"Performing initial blockchain rescan for {len(self._watched_addresses)} " "watched addresses (this may take a moment)..." ) try: # Trigger rescan from block 0 for all watched addresses await self._api_call( "POST", "v1/rescan", data={ "addresses": list(self._watched_addresses), "start_height": 0, }, ) # Wait for rescan to complete (neutrino processes this asynchronously) # On regtest with ~3000 blocks, this typically takes 5-10 seconds await asyncio.sleep(10.0) self._initial_rescan_done = True logger.info("Initial blockchain rescan completed") except Exception as e: logger.warning(f"Initial rescan failed (will retry on next sync): {e}") else: # Wait a moment for filter matching to complete await asyncio.sleep(0.5) try: # Request UTXO scan for addresses result = await self._api_call( "POST", "v1/utxos", data={"addresses": addresses}, ) tip_height = await self.get_block_height() for utxo_data in result.get("utxos", []): height = utxo_data.get("height", 0) confirmations = 0 if height > 0: confirmations = tip_height - height + 1 utxo = UTXO( txid=utxo_data["txid"], vout=utxo_data["vout"], value=utxo_data["value"], address=utxo_data.get("address", ""), confirmations=confirmations, scriptpubkey=utxo_data.get("scriptpubkey", ""), height=height if height > 0 else None, ) utxos.append(utxo) logger.debug(f"Found {len(utxos)} UTXOs for {len(addresses)} addresses") except Exception as e: logger.error(f"Failed to fetch UTXOs: {e}") return utxos async def get_address_balance(self, address: str) -> int: """Get balance for an address in satoshis.""" utxos = await self.get_utxos([address]) balance = sum(utxo.value for utxo in utxos) logger.debug(f"Balance for {address}: {balance} sats") return balance async def broadcast_transaction(self, tx_hex: str) -> str: """ Broadcast transaction via neutrino to the P2P network. Neutrino maintains P2P connections and can broadcast transactions directly to connected peers. """ try: result = await self._api_call( "POST", "v1/tx/broadcast", data={"tx_hex": tx_hex}, ) txid = result.get("txid", "") logger.info(f"Broadcast transaction: {txid}") return txid except Exception as e: logger.error(f"Failed to broadcast transaction: {e}") raise ValueError(f"Broadcast failed: {e}") from e async def get_transaction(self, txid: str) -> Transaction | None: """ Get transaction by txid. Note: Neutrino uses compact block filters (BIP158) and can only fetch transactions for addresses it has rescanned. It cannot fetch arbitrary transactions by txid alone. This method always returns None. For verification after broadcast, rely on UTXO checks with known addresses and block heights instead. """ # Neutrino doesn't support fetching arbitrary transactions by txid # It can only work with UTXOs for known addresses via compact filters return None async def verify_tx_output( self, txid: str, vout: int, address: str, start_height: int | None = None, ) -> bool: """ Verify that a specific transaction output exists using neutrino's UTXO endpoint. Uses GET /v1/utxo/{txid}/{vout}?address=...&start_height=... to check if the output exists. This works because neutrino uses compact block filters that can match on addresses. Args: txid: Transaction ID to verify vout: Output index to check address: The address that should own this output start_height: Block height hint for efficient scanning (recommended) Returns: True if the output exists, False otherwise """ try: params: dict[str, str | int] = {"address": address} if start_height is not None: params["start_height"] = start_height result = await self._api_call( "GET", f"v1/utxo/{txid}/{vout}", params=params, ) # If we got a response with unspent status, the output exists # Note: Even spent outputs confirm the transaction was broadcast if result is not None: logger.debug( f"Verified tx output {txid}:{vout} exists " f"(unspent={result.get('unspent', 'unknown')})" ) return True return False except httpx.HTTPStatusError as e: if e.response.status_code == 404: # Output not found logger.debug(f"Tx output {txid}:{vout} not found") return False logger.warning(f"Error verifying tx output {txid}:{vout}: {e}") return False except Exception as e: logger.warning(f"Error verifying tx output {txid}:{vout}: {e}") return False async def estimate_fee(self, target_blocks: int) -> int: """ Estimate fee in sat/vbyte for target confirmation blocks. Neutrino can estimate fees based on observed mempool/block data. Falls back to reasonable defaults if estimation unavailable. """ try: result = await self._api_call( "GET", "v1/fees/estimate", params={"target_blocks": target_blocks}, ) fee_rate = result.get("fee_rate", 0) if fee_rate > 0: logger.debug(f"Estimated fee for {target_blocks} blocks: {fee_rate} sat/vB") return int(fee_rate) except Exception as e: logger.warning(f"Fee estimation failed: {e}") # Fallback fee rates based on target if target_blocks <= 1: return 20 elif target_blocks <= 3: return 10 elif target_blocks <= 6: return 5 else: return 2 async def get_block_height(self) -> int: """Get current blockchain height from neutrino.""" try: result = await self._api_call("GET", "v1/status") height = result.get("block_height", 0) logger.debug(f"Current block height: {height}") return height except Exception as e: logger.error(f"Failed to fetch block height: {e}") raise async def get_block_time(self, block_height: int) -> int: """Get block time (unix timestamp) for given height.""" try: result = await self._api_call( "GET", f"v1/block/{block_height}/header", ) timestamp = result.get("timestamp", 0) logger.debug(f"Block {block_height} timestamp: {timestamp}") return timestamp except Exception as e: logger.error(f"Failed to fetch block time for height {block_height}: {e}") raise async def get_block_hash(self, block_height: int) -> str: """Get block hash for given height.""" try: result = await self._api_call( "GET", f"v1/block/{block_height}/header", ) block_hash = result.get("hash", "") logger.debug(f"Block hash for height {block_height}: {block_hash}") return block_hash except Exception as e: logger.error(f"Failed to fetch block hash for height {block_height}: {e}") raise async def get_utxo(self, txid: str, vout: int) -> UTXO | None: """Get a specific UTXO from the blockchain. Returns None if the UTXO does not exist or has been spent.""" try: result = await self._api_call( "GET", f"v1/utxo/{txid}/{vout}", ) if not result or result.get("spent", False): logger.debug(f"UTXO {txid}:{vout} not found or spent") return None tip_height = await self.get_block_height() height = result.get("height", 0) confirmations = 0 if height > 0: confirmations = tip_height - height + 1 return UTXO( txid=txid, vout=vout, value=result.get("value", 0), address=result.get("address", ""), confirmations=confirmations, scriptpubkey=result.get("scriptpubkey", ""), height=height if height > 0 else None, ) except httpx.HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"UTXO {txid}:{vout} not found") return None logger.error(f"Failed to get UTXO {txid}:{vout}: {e}") return None except Exception as e: logger.error(f"Failed to get UTXO {txid}:{vout}: {e}") return None def requires_neutrino_metadata(self) -> bool: """ Neutrino backend requires metadata for arbitrary UTXO verification. Without scriptPubKey and blockheight hints, Neutrino cannot verify UTXOs that it hasn't been watching from the start. Returns: True - Neutrino always requires metadata for counterparty UTXOs """ return True def can_provide_neutrino_metadata(self) -> bool: """ Neutrino backend cannot reliably provide metadata for all UTXOs. Light clients can only provide metadata for UTXOs they've been watching. They cannot provide metadata for arbitrary addresses like full nodes can. Returns: False - Neutrino cannot provide metadata for arbitrary UTXOs """ return False async def verify_utxo_with_metadata( self, txid: str, vout: int, scriptpubkey: str, blockheight: int, ) -> UTXOVerificationResult: """ Verify a UTXO using provided metadata (neutrino_compat feature). This is the key method that enables Neutrino light clients to verify counterparty UTXOs in CoinJoin without arbitrary blockchain queries. Uses the neutrino-api v0.4 UTXO check endpoint which requires: - address: The Bitcoin address that owns the UTXO (derived from scriptPubKey) - start_height: Block height to start scanning from (for efficiency) The API scans from start_height to chain tip using compact block filters to determine if the UTXO exists and whether it has been spent. Security: Validates blockheight to prevent rescan abuse attacks where malicious peers provide very low blockheights to trigger expensive rescans. Args: txid: Transaction ID vout: Output index scriptpubkey: Expected scriptPubKey (hex) - used to derive address blockheight: Block height where UTXO was confirmed - scan start hint Returns: UTXOVerificationResult with verification status and UTXO data """ # Security: Validate blockheight to prevent rescan abuse tip_height = await self.get_block_height() if blockheight < self._min_valid_blockheight: return UTXOVerificationResult( valid=False, error=f"Blockheight {blockheight} is below minimum valid height " f"{self._min_valid_blockheight} for {self.network}", ) if blockheight > tip_height: return UTXOVerificationResult( valid=False, error=f"Blockheight {blockheight} is in the future (tip: {tip_height})", ) # Limit rescan depth to prevent DoS rescan_depth = tip_height - blockheight if rescan_depth > self._max_rescan_depth: return UTXOVerificationResult( valid=False, error=f"Rescan depth {rescan_depth} exceeds max {self._max_rescan_depth}. " f"UTXO too old for efficient verification.", ) logger.debug( f"Verifying UTXO {txid}:{vout} with metadata " f"(scriptpubkey={scriptpubkey[:20]}..., blockheight={blockheight})" ) # Step 1: Derive address from scriptPubKey # The neutrino-api v0.4 requires the address for UTXO lookup address = self._scriptpubkey_to_address(scriptpubkey) if not address: return UTXOVerificationResult( valid=False, error=f"Could not derive address from scriptPubKey: {scriptpubkey[:40]}...", ) logger.debug(f"Derived address {address} from scriptPubKey") try: # Step 2: Query the specific UTXO using the v0.4 API # GET /v1/utxo/{txid}/{vout}?address=...&start_height=... # # The start_height parameter is critical for performance: # - Scanning 1 block takes ~0.01s # - Scanning 100 blocks takes ~0.5s # - Scanning 10,000+ blocks can take minutes # # We use blockheight - 1 as a safety margin in case of reorgs start_height = max(0, blockheight - 1) result = await self._api_call( "GET", f"v1/utxo/{txid}/{vout}", params={"address": address, "start_height": start_height}, ) # Check if UTXO is unspent if not result.get("unspent", False): spending_txid = result.get("spending_txid", "unknown") spending_height = result.get("spending_height", "unknown") return UTXOVerificationResult( valid=False, error=f"UTXO has been spent in tx {spending_txid} at height {spending_height}", ) # Step 3: Verify scriptPubKey matches actual_scriptpubkey = result.get("scriptpubkey", "") scriptpubkey_matches = actual_scriptpubkey.lower() == scriptpubkey.lower() if not scriptpubkey_matches: return UTXOVerificationResult( valid=False, value=result.get("value", 0), error=f"ScriptPubKey mismatch: expected {scriptpubkey[:20]}..., " f"got {actual_scriptpubkey[:20]}...", scriptpubkey_matches=False, ) # Step 4: Calculate confirmations tip_height = await self.get_block_height() # The blockheight parameter is the confirmation height hint from the peer confirmations = tip_height - blockheight + 1 if blockheight > 0 else 0 logger.info( f"UTXO {txid}:{vout} verified: value={result.get('value', 0)}, " f"confirmations={confirmations}" ) return UTXOVerificationResult( valid=True, value=result.get("value", 0), confirmations=confirmations, scriptpubkey_matches=True, ) except httpx.HTTPStatusError as e: if e.response.status_code == 404: return UTXOVerificationResult( valid=False, error="UTXO not found - may not exist or address derivation failed", ) return UTXOVerificationResult( valid=False, error=f"UTXO query failed: {e}", ) except Exception as e: return UTXOVerificationResult( valid=False, error=f"Verification failed: {e}", ) def _scriptpubkey_to_address(self, scriptpubkey: str) -> str | None: """ Convert scriptPubKey to address for watch list. Supports common script types: - P2WPKH: 0014<20-byte-hash> -> bc1q... - P2WSH: 0020<32-byte-hash> -> bc1q... - P2PKH: 76a914<20-byte-hash>88ac -> 1... - P2SH: a914<20-byte-hash>87 -> 3... Args: scriptpubkey: Hex-encoded scriptPubKey Returns: Bitcoin address or None if conversion fails """ try: script_bytes = bytes.fromhex(scriptpubkey) # P2WPKH: OP_0 <20 bytes> if len(script_bytes) == 22 and script_bytes[0] == 0x00 and script_bytes[1] == 0x14: # Use bech32 encoding return self._encode_bech32_address(script_bytes[2:], 0) # P2WSH: OP_0 <32 bytes> if len(script_bytes) == 34 and script_bytes[0] == 0x00 and script_bytes[1] == 0x20: return self._encode_bech32_address(script_bytes[2:], 0) # P2PKH: OP_DUP OP_HASH160 <20 bytes> OP_EQUALVERIFY OP_CHECKSIG if ( len(script_bytes) == 25 and script_bytes[0] == 0x76 and script_bytes[1] == 0xA9 and script_bytes[2] == 0x14 and script_bytes[23] == 0x88 and script_bytes[24] == 0xAC ): return self._encode_base58check_address(script_bytes[3:23], 0x00) # P2SH: OP_HASH160 <20 bytes> OP_EQUAL if ( len(script_bytes) == 23 and script_bytes[0] == 0xA9 and script_bytes[1] == 0x14 and script_bytes[22] == 0x87 ): return self._encode_base58check_address(script_bytes[2:22], 0x05) logger.warning(f"Unknown scriptPubKey format: {scriptpubkey[:20]}...") return None except Exception as e: logger.warning(f"Failed to convert scriptPubKey to address: {e}") return None def _encode_bech32_address(self, witness_program: bytes, witness_version: int) -> str: """Encode witness program as bech32 address.""" # Simplified bech32 encoding - in production use a proper library hrp = "bc" if self.network == "mainnet" else "bcrt" if self.network == "regtest" else "tb" # Convert witness program to 5-bit groups def convertbits(data: bytes, frombits: int, tobits: int, pad: bool = True) -> list[int]: acc = 0 bits = 0 ret = [] maxv = (1 << tobits) - 1 for value in data: acc = (acc << frombits) | value bits += frombits while bits >= tobits: bits -= tobits ret.append((acc >> bits) & maxv) if pad and bits: ret.append((acc << (tobits - bits)) & maxv) return ret charset = "qpzry9x8gf2tvdw0s3jn54khce6mua7l" def bech32_polymod(values: list[int]) -> int: gen = [0x3B6A57B2, 0x26508E6D, 0x1EA119FA, 0x3D4233DD, 0x2A1462B3] chk = 1 for v in values: b = chk >> 25 chk = ((chk & 0x1FFFFFF) << 5) ^ v for i in range(5): chk ^= gen[i] if ((b >> i) & 1) else 0 return chk def bech32_hrp_expand(hrp: str) -> list[int]: return [ord(x) >> 5 for x in hrp] + [0] + [ord(x) & 31 for x in hrp] def bech32_create_checksum(hrp: str, data: list[int]) -> list[int]: values = bech32_hrp_expand(hrp) + data polymod = bech32_polymod(values + [0, 0, 0, 0, 0, 0]) ^ 1 return [(polymod >> 5 * (5 - i)) & 31 for i in range(6)] data = [witness_version] + convertbits(witness_program, 8, 5) checksum = bech32_create_checksum(hrp, data) return hrp + "1" + "".join(charset[d] for d in data + checksum) def _encode_base58check_address(self, payload: bytes, version: int) -> str: """Encode payload as base58check address.""" import hashlib versioned = bytes([version]) + payload checksum = hashlib.sha256(hashlib.sha256(versioned).digest()).digest()[:4] data = versioned + checksum ALPHABET = "123456789ABCDEFGHJKLMNPQRSTUVWXYZabcdefghijkmnopqrstuvwxyz" # noqa: N806 n = int.from_bytes(data, "big") result = "" while n > 0: n, r = divmod(n, 58) result = ALPHABET[r] + result # Add leading zeros for byte in data: if byte == 0: result = "1" + result else: break return result async def get_filter_header(self, block_height: int) -> str: """ Get compact block filter header for given height. BIP157 filter headers form a chain for validation. """ try: result = await self._api_call( "GET", f"v1/block/{block_height}/filter_header", ) return result.get("filter_header", "") except Exception as e: logger.error(f"Failed to fetch filter header for height {block_height}: {e}") raise async def get_connected_peers(self) -> list[dict[str, Any]]: """Get list of connected P2P peers.""" try: result = await self._api_call("GET", "v1/peers") return result.get("peers", []) except Exception as e: logger.warning(f"Failed to fetch peers: {e}") return [] async def rescan_from_height( self, start_height: int, addresses: list[str] | None = None, outpoints: list[tuple[str, int]] | None = None, ) -> None: """ Rescan blockchain from a specific height for addresses. This triggers neutrino to re-check compact block filters from the specified height for relevant transactions. Uses the neutrino-api v0.4 rescan endpoint: POST /v1/rescan with {"start_height": N, "addresses": [...]} Note: The v0.4 API only supports address-based rescans. Outpoints are tracked via address watches instead. Args: start_height: Block height to start rescan from addresses: List of addresses to scan for (required for v0.4) outpoints: List of (txid, vout) outpoints - not directly supported, will be ignored (use add_watch_outpoint instead) Raises: ValueError: If start_height is invalid or rescan depth exceeds limits """ if not addresses: logger.warning("Rescan called without addresses - nothing to scan") return # Security: Validate start_height to prevent rescan abuse if start_height < self._min_valid_blockheight: raise ValueError( f"start_height {start_height} is below minimum valid height " f"{self._min_valid_blockheight} for {self.network}" ) tip_height = await self.get_block_height() if start_height > tip_height: raise ValueError(f"start_height {start_height} is in the future (tip: {tip_height})") rescan_depth = tip_height - start_height if rescan_depth > self._max_rescan_depth: raise ValueError( f"Rescan depth {rescan_depth} exceeds maximum {self._max_rescan_depth} blocks" ) # Track addresses locally (with limit check) for addr in addresses: await self.add_watch_address(addr) # Note: v0.4 API doesn't support outpoints in rescan if outpoints: logger.debug( "Outpoints parameter ignored in v0.4 rescan API. " "Use address-based watching instead." ) for txid, vout in outpoints: self._watched_outpoints.add((txid, vout)) try: await self._api_call( "POST", "v1/rescan", data={ "start_height": start_height, "addresses": addresses, }, ) logger.info(f"Started rescan from height {start_height} for {len(addresses)} addresses") except Exception as e: logger.error(f"Failed to start rescan: {e}") raise async def close(self) -> None: """Close the HTTP client connection.""" await self.client.aclose()Blockchain backend using Neutrino light client.
Neutrino is a privacy-preserving Bitcoin light client that uses BIP157/BIP158 compact block filters instead of traditional SPV.
Communication with the neutrino daemon is via REST API. The neutrino daemon should be running alongside this client.
Initialize Neutrino backend.
Args
neutrino_url- URL of the neutrino REST API (default port 8334)
network- Bitcoin network (mainnet, testnet, regtest, signet)
connect_peers- List of peer addresses to connect to (optional)
data_dir- Directory for neutrino data (headers, filters)
Ancestors
- BlockchainBackend
- abc.ABC
Methods
async def add_watch_address(self, address: str) ‑> None-
Expand source code
async def add_watch_address(self, address: str) -> None: """ Add an address to the local watch list. In neutrino-api v0.4, address watching is implicit - you just query UTXOs or do rescans with the addresses you care about. This method tracks addresses locally for convenience. Security: Limits the number of watched addresses to prevent memory exhaustion attacks. Args: address: Bitcoin address to watch Raises: ValueError: If watch list limit exceeded """ if address in self._watched_addresses: return if len(self._watched_addresses) >= self._max_watched_addresses: logger.warning( f"Watch list limit reached ({self._max_watched_addresses}). " f"Cannot add address: {address[:20]}..." ) raise ValueError(f"Watch list limit ({self._max_watched_addresses}) exceeded") self._watched_addresses.add(address) logger.debug(f"Watching address: {address}")Add an address to the local watch list.
In neutrino-api v0.4, address watching is implicit - you just query UTXOs or do rescans with the addresses you care about. This method tracks addresses locally for convenience.
Security: Limits the number of watched addresses to prevent memory exhaustion attacks.
Args
address- Bitcoin address to watch
Raises
ValueError- If watch list limit exceeded
async def add_watch_outpoint(self, txid: str, vout: int) ‑> None-
Expand source code
async def add_watch_outpoint(self, txid: str, vout: int) -> None: """ Add an outpoint to the local watch list. In neutrino-api v0.4, outpoint watching is done via UTXO queries with the address parameter. This method tracks outpoints locally. Args: txid: Transaction ID vout: Output index """ outpoint = (txid, vout) if outpoint in self._watched_outpoints: return self._watched_outpoints.add(outpoint) logger.debug(f"Watching outpoint: {txid}:{vout}")Add an outpoint to the local watch list.
In neutrino-api v0.4, outpoint watching is done via UTXO queries with the address parameter. This method tracks outpoints locally.
Args
txid- Transaction ID
vout- Output index
async def broadcast_transaction(self, tx_hex: str) ‑> str-
Expand source code
async def broadcast_transaction(self, tx_hex: str) -> str: """ Broadcast transaction via neutrino to the P2P network. Neutrino maintains P2P connections and can broadcast transactions directly to connected peers. """ try: result = await self._api_call( "POST", "v1/tx/broadcast", data={"tx_hex": tx_hex}, ) txid = result.get("txid", "") logger.info(f"Broadcast transaction: {txid}") return txid except Exception as e: logger.error(f"Failed to broadcast transaction: {e}") raise ValueError(f"Broadcast failed: {e}") from eBroadcast transaction via neutrino to the P2P network.
Neutrino maintains P2P connections and can broadcast transactions directly to connected peers.
def can_provide_neutrino_metadata(self) ‑> bool-
Expand source code
def can_provide_neutrino_metadata(self) -> bool: """ Neutrino backend cannot reliably provide metadata for all UTXOs. Light clients can only provide metadata for UTXOs they've been watching. They cannot provide metadata for arbitrary addresses like full nodes can. Returns: False - Neutrino cannot provide metadata for arbitrary UTXOs """ return FalseNeutrino backend cannot reliably provide metadata for all UTXOs.
Light clients can only provide metadata for UTXOs they've been watching. They cannot provide metadata for arbitrary addresses like full nodes can.
Returns
False - Neutrino cannot provide metadata for arbitrary UTXOs
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: """Close the HTTP client connection.""" await self.client.aclose()Close the HTTP client connection.
async def estimate_fee(self, target_blocks: int) ‑> int-
Expand source code
async def estimate_fee(self, target_blocks: int) -> int: """ Estimate fee in sat/vbyte for target confirmation blocks. Neutrino can estimate fees based on observed mempool/block data. Falls back to reasonable defaults if estimation unavailable. """ try: result = await self._api_call( "GET", "v1/fees/estimate", params={"target_blocks": target_blocks}, ) fee_rate = result.get("fee_rate", 0) if fee_rate > 0: logger.debug(f"Estimated fee for {target_blocks} blocks: {fee_rate} sat/vB") return int(fee_rate) except Exception as e: logger.warning(f"Fee estimation failed: {e}") # Fallback fee rates based on target if target_blocks <= 1: return 20 elif target_blocks <= 3: return 10 elif target_blocks <= 6: return 5 else: return 2Estimate fee in sat/vbyte for target confirmation blocks.
Neutrino can estimate fees based on observed mempool/block data. Falls back to reasonable defaults if estimation unavailable.
async def get_address_balance(self, address: str) ‑> int-
Expand source code
async def get_address_balance(self, address: str) -> int: """Get balance for an address in satoshis.""" utxos = await self.get_utxos([address]) balance = sum(utxo.value for utxo in utxos) logger.debug(f"Balance for {address}: {balance} sats") return balanceGet balance for an address in satoshis.
async def get_block_hash(self, block_height: int) ‑> str-
Expand source code
async def get_block_hash(self, block_height: int) -> str: """Get block hash for given height.""" try: result = await self._api_call( "GET", f"v1/block/{block_height}/header", ) block_hash = result.get("hash", "") logger.debug(f"Block hash for height {block_height}: {block_hash}") return block_hash except Exception as e: logger.error(f"Failed to fetch block hash for height {block_height}: {e}") raiseGet block hash for given height.
async def get_block_height(self) ‑> int-
Expand source code
async def get_block_height(self) -> int: """Get current blockchain height from neutrino.""" try: result = await self._api_call("GET", "v1/status") height = result.get("block_height", 0) logger.debug(f"Current block height: {height}") return height except Exception as e: logger.error(f"Failed to fetch block height: {e}") raiseGet current blockchain height from neutrino.
async def get_block_time(self, block_height: int) ‑> int-
Expand source code
async def get_block_time(self, block_height: int) -> int: """Get block time (unix timestamp) for given height.""" try: result = await self._api_call( "GET", f"v1/block/{block_height}/header", ) timestamp = result.get("timestamp", 0) logger.debug(f"Block {block_height} timestamp: {timestamp}") return timestamp except Exception as e: logger.error(f"Failed to fetch block time for height {block_height}: {e}") raiseGet block time (unix timestamp) for given height.
async def get_connected_peers(self) ‑> list[dict[str, typing.Any]]-
Expand source code
async def get_connected_peers(self) -> list[dict[str, Any]]: """Get list of connected P2P peers.""" try: result = await self._api_call("GET", "v1/peers") return result.get("peers", []) except Exception as e: logger.warning(f"Failed to fetch peers: {e}") return []Get list of connected P2P peers.
async def get_filter_header(self, block_height: int) ‑> str-
Expand source code
async def get_filter_header(self, block_height: int) -> str: """ Get compact block filter header for given height. BIP157 filter headers form a chain for validation. """ try: result = await self._api_call( "GET", f"v1/block/{block_height}/filter_header", ) return result.get("filter_header", "") except Exception as e: logger.error(f"Failed to fetch filter header for height {block_height}: {e}") raiseGet compact block filter header for given height.
BIP157 filter headers form a chain for validation.
async def get_transaction(self, txid: str) ‑> Transaction | None-
Expand source code
async def get_transaction(self, txid: str) -> Transaction | None: """ Get transaction by txid. Note: Neutrino uses compact block filters (BIP158) and can only fetch transactions for addresses it has rescanned. It cannot fetch arbitrary transactions by txid alone. This method always returns None. For verification after broadcast, rely on UTXO checks with known addresses and block heights instead. """ # Neutrino doesn't support fetching arbitrary transactions by txid # It can only work with UTXOs for known addresses via compact filters return NoneGet transaction by txid.
Note: Neutrino uses compact block filters (BIP158) and can only fetch transactions for addresses it has rescanned. It cannot fetch arbitrary transactions by txid alone. This method always returns None.
For verification after broadcast, rely on UTXO checks with known addresses and block heights instead.
async def get_utxo(self, txid: str, vout: int) ‑> UTXO | None-
Expand source code
async def get_utxo(self, txid: str, vout: int) -> UTXO | None: """Get a specific UTXO from the blockchain. Returns None if the UTXO does not exist or has been spent.""" try: result = await self._api_call( "GET", f"v1/utxo/{txid}/{vout}", ) if not result or result.get("spent", False): logger.debug(f"UTXO {txid}:{vout} not found or spent") return None tip_height = await self.get_block_height() height = result.get("height", 0) confirmations = 0 if height > 0: confirmations = tip_height - height + 1 return UTXO( txid=txid, vout=vout, value=result.get("value", 0), address=result.get("address", ""), confirmations=confirmations, scriptpubkey=result.get("scriptpubkey", ""), height=height if height > 0 else None, ) except httpx.HTTPStatusError as e: if e.response.status_code == 404: logger.debug(f"UTXO {txid}:{vout} not found") return None logger.error(f"Failed to get UTXO {txid}:{vout}: {e}") return None except Exception as e: logger.error(f"Failed to get UTXO {txid}:{vout}: {e}") return NoneGet a specific UTXO from the blockchain. Returns None if the UTXO does not exist or has been spent.
async def get_utxos(self, addresses: list[str]) ‑> list[UTXO]-
Expand source code
async def get_utxos(self, addresses: list[str]) -> list[UTXO]: """ Get UTXOs for given addresses using neutrino's rescan capability. Neutrino will scan the blockchain using compact block filters to find transactions relevant to the watched addresses. On first call, triggers a full blockchain rescan from genesis to ensure all historical UTXOs are found (critical for wallets funded before neutrino started). """ utxos: list[UTXO] = [] # Add addresses to watch list for address in addresses: await self.add_watch_address(address) # On first UTXO query, trigger a full blockchain rescan to find existing UTXOs # This is critical for wallets that were funded before neutrino was watching them logger.debug( f"get_utxos: _initial_rescan_done={self._initial_rescan_done}, " f"watched_addresses={len(self._watched_addresses)}" ) if not self._initial_rescan_done and self._watched_addresses: logger.info( f"Performing initial blockchain rescan for {len(self._watched_addresses)} " "watched addresses (this may take a moment)..." ) try: # Trigger rescan from block 0 for all watched addresses await self._api_call( "POST", "v1/rescan", data={ "addresses": list(self._watched_addresses), "start_height": 0, }, ) # Wait for rescan to complete (neutrino processes this asynchronously) # On regtest with ~3000 blocks, this typically takes 5-10 seconds await asyncio.sleep(10.0) self._initial_rescan_done = True logger.info("Initial blockchain rescan completed") except Exception as e: logger.warning(f"Initial rescan failed (will retry on next sync): {e}") else: # Wait a moment for filter matching to complete await asyncio.sleep(0.5) try: # Request UTXO scan for addresses result = await self._api_call( "POST", "v1/utxos", data={"addresses": addresses}, ) tip_height = await self.get_block_height() for utxo_data in result.get("utxos", []): height = utxo_data.get("height", 0) confirmations = 0 if height > 0: confirmations = tip_height - height + 1 utxo = UTXO( txid=utxo_data["txid"], vout=utxo_data["vout"], value=utxo_data["value"], address=utxo_data.get("address", ""), confirmations=confirmations, scriptpubkey=utxo_data.get("scriptpubkey", ""), height=height if height > 0 else None, ) utxos.append(utxo) logger.debug(f"Found {len(utxos)} UTXOs for {len(addresses)} addresses") except Exception as e: logger.error(f"Failed to fetch UTXOs: {e}") return utxosGet UTXOs for given addresses using neutrino's rescan capability.
Neutrino will scan the blockchain using compact block filters to find transactions relevant to the watched addresses.
On first call, triggers a full blockchain rescan from genesis to ensure all historical UTXOs are found (critical for wallets funded before neutrino started).
def requires_neutrino_metadata(self) ‑> bool-
Expand source code
def requires_neutrino_metadata(self) -> bool: """ Neutrino backend requires metadata for arbitrary UTXO verification. Without scriptPubKey and blockheight hints, Neutrino cannot verify UTXOs that it hasn't been watching from the start. Returns: True - Neutrino always requires metadata for counterparty UTXOs """ return TrueNeutrino backend requires metadata for arbitrary UTXO verification.
Without scriptPubKey and blockheight hints, Neutrino cannot verify UTXOs that it hasn't been watching from the start.
Returns
True - Neutrino always requires metadata for counterparty UTXOs
async def rescan_from_height(self,
start_height: int,
addresses: list[str] | None = None,
outpoints: list[tuple[str, int]] | None = None) ‑> None-
Expand source code
async def rescan_from_height( self, start_height: int, addresses: list[str] | None = None, outpoints: list[tuple[str, int]] | None = None, ) -> None: """ Rescan blockchain from a specific height for addresses. This triggers neutrino to re-check compact block filters from the specified height for relevant transactions. Uses the neutrino-api v0.4 rescan endpoint: POST /v1/rescan with {"start_height": N, "addresses": [...]} Note: The v0.4 API only supports address-based rescans. Outpoints are tracked via address watches instead. Args: start_height: Block height to start rescan from addresses: List of addresses to scan for (required for v0.4) outpoints: List of (txid, vout) outpoints - not directly supported, will be ignored (use add_watch_outpoint instead) Raises: ValueError: If start_height is invalid or rescan depth exceeds limits """ if not addresses: logger.warning("Rescan called without addresses - nothing to scan") return # Security: Validate start_height to prevent rescan abuse if start_height < self._min_valid_blockheight: raise ValueError( f"start_height {start_height} is below minimum valid height " f"{self._min_valid_blockheight} for {self.network}" ) tip_height = await self.get_block_height() if start_height > tip_height: raise ValueError(f"start_height {start_height} is in the future (tip: {tip_height})") rescan_depth = tip_height - start_height if rescan_depth > self._max_rescan_depth: raise ValueError( f"Rescan depth {rescan_depth} exceeds maximum {self._max_rescan_depth} blocks" ) # Track addresses locally (with limit check) for addr in addresses: await self.add_watch_address(addr) # Note: v0.4 API doesn't support outpoints in rescan if outpoints: logger.debug( "Outpoints parameter ignored in v0.4 rescan API. " "Use address-based watching instead." ) for txid, vout in outpoints: self._watched_outpoints.add((txid, vout)) try: await self._api_call( "POST", "v1/rescan", data={ "start_height": start_height, "addresses": addresses, }, ) logger.info(f"Started rescan from height {start_height} for {len(addresses)} addresses") except Exception as e: logger.error(f"Failed to start rescan: {e}") raiseRescan blockchain from a specific height for addresses.
This triggers neutrino to re-check compact block filters from the specified height for relevant transactions.
Uses the neutrino-api v0.4 rescan endpoint: POST /v1/rescan with {"start_height": N, "addresses": […]}
Note: The v0.4 API only supports address-based rescans. Outpoints are tracked via address watches instead.
Args
start_height- Block height to start rescan from
addresses- List of addresses to scan for (required for v0.4)
outpoints- List of (txid, vout) outpoints - not directly supported, will be ignored (use add_watch_outpoint instead)
Raises
ValueError- If start_height is invalid or rescan depth exceeds limits
async def verify_tx_output(self, txid: str, vout: int, address: str, start_height: int | None = None) ‑> bool-
Expand source code
async def verify_tx_output( self, txid: str, vout: int, address: str, start_height: int | None = None, ) -> bool: """ Verify that a specific transaction output exists using neutrino's UTXO endpoint. Uses GET /v1/utxo/{txid}/{vout}?address=...&start_height=... to check if the output exists. This works because neutrino uses compact block filters that can match on addresses. Args: txid: Transaction ID to verify vout: Output index to check address: The address that should own this output start_height: Block height hint for efficient scanning (recommended) Returns: True if the output exists, False otherwise """ try: params: dict[str, str | int] = {"address": address} if start_height is not None: params["start_height"] = start_height result = await self._api_call( "GET", f"v1/utxo/{txid}/{vout}", params=params, ) # If we got a response with unspent status, the output exists # Note: Even spent outputs confirm the transaction was broadcast if result is not None: logger.debug( f"Verified tx output {txid}:{vout} exists " f"(unspent={result.get('unspent', 'unknown')})" ) return True return False except httpx.HTTPStatusError as e: if e.response.status_code == 404: # Output not found logger.debug(f"Tx output {txid}:{vout} not found") return False logger.warning(f"Error verifying tx output {txid}:{vout}: {e}") return False except Exception as e: logger.warning(f"Error verifying tx output {txid}:{vout}: {e}") return FalseVerify that a specific transaction output exists using neutrino's UTXO endpoint.
Uses GET /v1/utxo/{txid}/{vout}?address=…&start_height=… to check if the output exists. This works because neutrino uses compact block filters that can match on addresses.
Args
txid- Transaction ID to verify
vout- Output index to check
address- The address that should own this output
start_height- Block height hint for efficient scanning (recommended)
Returns
True if the output exists, False otherwise
async def verify_utxo_with_metadata(self, txid: str, vout: int, scriptpubkey: str, blockheight: int) ‑> UTXOVerificationResult-
Expand source code
async def verify_utxo_with_metadata( self, txid: str, vout: int, scriptpubkey: str, blockheight: int, ) -> UTXOVerificationResult: """ Verify a UTXO using provided metadata (neutrino_compat feature). This is the key method that enables Neutrino light clients to verify counterparty UTXOs in CoinJoin without arbitrary blockchain queries. Uses the neutrino-api v0.4 UTXO check endpoint which requires: - address: The Bitcoin address that owns the UTXO (derived from scriptPubKey) - start_height: Block height to start scanning from (for efficiency) The API scans from start_height to chain tip using compact block filters to determine if the UTXO exists and whether it has been spent. Security: Validates blockheight to prevent rescan abuse attacks where malicious peers provide very low blockheights to trigger expensive rescans. Args: txid: Transaction ID vout: Output index scriptpubkey: Expected scriptPubKey (hex) - used to derive address blockheight: Block height where UTXO was confirmed - scan start hint Returns: UTXOVerificationResult with verification status and UTXO data """ # Security: Validate blockheight to prevent rescan abuse tip_height = await self.get_block_height() if blockheight < self._min_valid_blockheight: return UTXOVerificationResult( valid=False, error=f"Blockheight {blockheight} is below minimum valid height " f"{self._min_valid_blockheight} for {self.network}", ) if blockheight > tip_height: return UTXOVerificationResult( valid=False, error=f"Blockheight {blockheight} is in the future (tip: {tip_height})", ) # Limit rescan depth to prevent DoS rescan_depth = tip_height - blockheight if rescan_depth > self._max_rescan_depth: return UTXOVerificationResult( valid=False, error=f"Rescan depth {rescan_depth} exceeds max {self._max_rescan_depth}. " f"UTXO too old for efficient verification.", ) logger.debug( f"Verifying UTXO {txid}:{vout} with metadata " f"(scriptpubkey={scriptpubkey[:20]}..., blockheight={blockheight})" ) # Step 1: Derive address from scriptPubKey # The neutrino-api v0.4 requires the address for UTXO lookup address = self._scriptpubkey_to_address(scriptpubkey) if not address: return UTXOVerificationResult( valid=False, error=f"Could not derive address from scriptPubKey: {scriptpubkey[:40]}...", ) logger.debug(f"Derived address {address} from scriptPubKey") try: # Step 2: Query the specific UTXO using the v0.4 API # GET /v1/utxo/{txid}/{vout}?address=...&start_height=... # # The start_height parameter is critical for performance: # - Scanning 1 block takes ~0.01s # - Scanning 100 blocks takes ~0.5s # - Scanning 10,000+ blocks can take minutes # # We use blockheight - 1 as a safety margin in case of reorgs start_height = max(0, blockheight - 1) result = await self._api_call( "GET", f"v1/utxo/{txid}/{vout}", params={"address": address, "start_height": start_height}, ) # Check if UTXO is unspent if not result.get("unspent", False): spending_txid = result.get("spending_txid", "unknown") spending_height = result.get("spending_height", "unknown") return UTXOVerificationResult( valid=False, error=f"UTXO has been spent in tx {spending_txid} at height {spending_height}", ) # Step 3: Verify scriptPubKey matches actual_scriptpubkey = result.get("scriptpubkey", "") scriptpubkey_matches = actual_scriptpubkey.lower() == scriptpubkey.lower() if not scriptpubkey_matches: return UTXOVerificationResult( valid=False, value=result.get("value", 0), error=f"ScriptPubKey mismatch: expected {scriptpubkey[:20]}..., " f"got {actual_scriptpubkey[:20]}...", scriptpubkey_matches=False, ) # Step 4: Calculate confirmations tip_height = await self.get_block_height() # The blockheight parameter is the confirmation height hint from the peer confirmations = tip_height - blockheight + 1 if blockheight > 0 else 0 logger.info( f"UTXO {txid}:{vout} verified: value={result.get('value', 0)}, " f"confirmations={confirmations}" ) return UTXOVerificationResult( valid=True, value=result.get("value", 0), confirmations=confirmations, scriptpubkey_matches=True, ) except httpx.HTTPStatusError as e: if e.response.status_code == 404: return UTXOVerificationResult( valid=False, error="UTXO not found - may not exist or address derivation failed", ) return UTXOVerificationResult( valid=False, error=f"UTXO query failed: {e}", ) except Exception as e: return UTXOVerificationResult( valid=False, error=f"Verification failed: {e}", )Verify a UTXO using provided metadata (neutrino_compat feature).
This is the key method that enables Neutrino light clients to verify counterparty UTXOs in CoinJoin without arbitrary blockchain queries.
Uses the neutrino-api v0.4 UTXO check endpoint which requires: - address: The Bitcoin address that owns the UTXO (derived from scriptPubKey) - start_height: Block height to start scanning from (for efficiency)
The API scans from start_height to chain tip using compact block filters to determine if the UTXO exists and whether it has been spent.
Security: Validates blockheight to prevent rescan abuse attacks where malicious peers provide very low blockheights to trigger expensive rescans.
Args
txid- Transaction ID
vout- Output index
scriptpubkey- Expected scriptPubKey (hex) - used to derive address
blockheight- Block height where UTXO was confirmed - scan start hint
Returns
UTXOVerificationResult with verification status and UTXO data
async def wait_for_sync(self, timeout: float = 300.0) ‑> bool-
Expand source code
async def wait_for_sync(self, timeout: float = 300.0) -> bool: """ Wait for neutrino to sync block headers and filters. Args: timeout: Maximum time to wait in seconds Returns: True if synced, False if timeout """ start_time = asyncio.get_event_loop().time() while True: try: status = await self._api_call("GET", "v1/status") synced = status.get("synced", False) block_height = status.get("block_height", 0) filter_height = status.get("filter_height", 0) if synced and block_height == filter_height: self._synced = True self._filter_header_tip = block_height logger.info(f"Neutrino synced at height {block_height}") return True logger.debug(f"Syncing... blocks: {block_height}, filters: {filter_height}") except Exception as e: logger.warning(f"Waiting for neutrino daemon: {e}") elapsed = asyncio.get_event_loop().time() - start_time if elapsed > timeout: logger.error("Neutrino sync timeout") return False await asyncio.sleep(2.0)Wait for neutrino to sync block headers and filters.
Args
timeout- Maximum time to wait in seconds
Returns
True if synced, False if timeout
Inherited members
class NeutrinoConfig (network: str = 'mainnet',
data_dir: str = '/data/neutrino',
listen_port: int = 8334,
peers: list[str] | None = None,
tor_socks: str | None = None)-
Expand source code
class NeutrinoConfig: """ Configuration for running a neutrino daemon. This configuration can be used to start a neutrino process programmatically or generate a config file. """ def __init__( self, network: str = "mainnet", data_dir: str = "/data/neutrino", listen_port: int = 8334, peers: list[str] | None = None, tor_socks: str | None = None, ): """ Initialize neutrino configuration. Args: network: Bitcoin network (mainnet, testnet, regtest, signet) data_dir: Directory for neutrino data listen_port: Port for REST API peers: List of peer addresses to connect to tor_socks: Tor SOCKS5 proxy address (e.g., "127.0.0.1:9050") """ self.network = network self.data_dir = data_dir self.listen_port = listen_port self.peers = peers or [] self.tor_socks = tor_socks def get_chain_params(self) -> dict[str, Any]: """Get chain-specific parameters.""" params = { "mainnet": { "default_port": 8333, "dns_seeds": [ "seed.bitcoin.sipa.be", "dnsseed.bluematt.me", "dnsseed.bitcoin.dashjr.org", "seed.bitcoinstats.com", "seed.bitcoin.jonasschnelli.ch", "seed.btc.petertodd.net", ], }, "testnet": { "default_port": 18333, "dns_seeds": [ "testnet-seed.bitcoin.jonasschnelli.ch", "seed.tbtc.petertodd.net", "testnet-seed.bluematt.me", ], }, "signet": { "default_port": 38333, "dns_seeds": [ "seed.signet.bitcoin.sprovoost.nl", ], }, "regtest": { "default_port": 18444, "dns_seeds": [], }, } return params.get(self.network, params["mainnet"]) def to_args(self) -> list[str]: """Generate command-line arguments for neutrino daemon.""" args = [ f"--datadir={self.data_dir}", f"--{self.network}", f"--restlisten=0.0.0.0:{self.listen_port}", ] if self.tor_socks: args.append(f"--proxy={self.tor_socks}") for peer in self.peers: args.append(f"--connect={peer}") return argsConfiguration for running a neutrino daemon.
This configuration can be used to start a neutrino process programmatically or generate a config file.
Initialize neutrino configuration.
Args
network- Bitcoin network (mainnet, testnet, regtest, signet)
data_dir- Directory for neutrino data
listen_port- Port for REST API
peers- List of peer addresses to connect to
tor_socks- Tor SOCKS5 proxy address (e.g., "127.0.0.1:9050")
Methods
def get_chain_params(self) ‑> dict[str, typing.Any]-
Expand source code
def get_chain_params(self) -> dict[str, Any]: """Get chain-specific parameters.""" params = { "mainnet": { "default_port": 8333, "dns_seeds": [ "seed.bitcoin.sipa.be", "dnsseed.bluematt.me", "dnsseed.bitcoin.dashjr.org", "seed.bitcoinstats.com", "seed.bitcoin.jonasschnelli.ch", "seed.btc.petertodd.net", ], }, "testnet": { "default_port": 18333, "dns_seeds": [ "testnet-seed.bitcoin.jonasschnelli.ch", "seed.tbtc.petertodd.net", "testnet-seed.bluematt.me", ], }, "signet": { "default_port": 38333, "dns_seeds": [ "seed.signet.bitcoin.sprovoost.nl", ], }, "regtest": { "default_port": 18444, "dns_seeds": [], }, } return params.get(self.network, params["mainnet"])Get chain-specific parameters.
def to_args(self) ‑> list[str]-
Expand source code
def to_args(self) -> list[str]: """Generate command-line arguments for neutrino daemon.""" args = [ f"--datadir={self.data_dir}", f"--{self.network}", f"--restlisten=0.0.0.0:{self.listen_port}", ] if self.tor_socks: args.append(f"--proxy={self.tor_socks}") for peer in self.peers: args.append(f"--connect={peer}") return argsGenerate command-line arguments for neutrino daemon.
class Transaction (*args: Any, **kwargs: Any)-
Expand source code
@dataclass class Transaction: txid: str raw: str confirmations: int block_height: int | None = None block_time: int | None = NoneInstance variables
var block_height : int | None-
The type of the None singleton.
var block_time : int | None-
The type of the None singleton.
var confirmations : int-
The type of the None singleton.
var raw : str-
The type of the None singleton.
var txid : str-
The type of the None singleton.
class UTXO (*args: Any, **kwargs: Any)-
Expand source code
@dataclass class UTXO: txid: str vout: int value: int address: str confirmations: int scriptpubkey: str height: int | None = NoneInstance variables
var address : str-
The type of the None singleton.
var confirmations : int-
The type of the None singleton.
var height : int | None-
The type of the None singleton.
var scriptpubkey : str-
The type of the None singleton.
var txid : str-
The type of the None singleton.
var value : int-
The type of the None singleton.
var vout : int-
The type of the None singleton.
class UTXOVerificationResult (*args: Any, **kwargs: Any)-
Expand source code
@dataclass class UTXOVerificationResult: """ Result of UTXO verification with metadata. Used by neutrino_compat feature for Neutrino-compatible verification. """ valid: bool value: int = 0 confirmations: int = 0 error: str | None = None scriptpubkey_matches: bool = FalseResult of UTXO verification with metadata.
Used by neutrino_compat feature for Neutrino-compatible verification.
Instance variables
var confirmations : int-
The type of the None singleton.
var error : str | None-
The type of the None singleton.
var scriptpubkey_matches : bool-
The type of the None singleton.
var valid : bool-
The type of the None singleton.
var value : int-
The type of the None singleton.