Module jmwallet.backends.bitcoin_core
Bitcoin Core RPC blockchain backend. Uses RPC calls but NOT wallet functionality (no BDB dependency).
Classes
class BitcoinCoreBackend (rpc_url: str = 'http://127.0.0.1:18443',
rpc_user: str = 'rpcuser',
rpc_password: str = 'rpcpassword',
scan_timeout: float = 300.0)-
Expand source code
class BitcoinCoreBackend(BlockchainBackend): """ Blockchain backend using Bitcoin Core RPC. Does NOT use Bitcoin Core wallet (avoids BDB issues). Uses scantxoutset and other non-wallet RPC methods. """ def __init__( self, rpc_url: str = "http://127.0.0.1:18443", rpc_user: str = "rpcuser", rpc_password: str = "rpcpassword", scan_timeout: float = SCAN_RPC_TIMEOUT, ): self.rpc_url = rpc_url.rstrip("/") self.rpc_user = rpc_user self.rpc_password = rpc_password self.scan_timeout = scan_timeout # Client for regular RPC calls self.client = httpx.AsyncClient(timeout=DEFAULT_RPC_TIMEOUT, auth=(rpc_user, rpc_password)) # Separate client for long-running scans self._scan_client = httpx.AsyncClient(timeout=scan_timeout, auth=(rpc_user, rpc_password)) self._request_id = 0 async def _rpc_call( self, method: str, params: list | None = None, client: httpx.AsyncClient | None = None, ) -> Any: """ Make an RPC call to Bitcoin Core. Args: method: RPC method name params: Method parameters client: Optional httpx client (uses default client if not provided) Returns: RPC result Raises: ValueError: On RPC errors httpx.HTTPError: On connection/timeout errors """ self._request_id += 1 payload = { "jsonrpc": "2.0", "id": self._request_id, "method": method, "params": params or [], } use_client = client or self.client try: response = await use_client.post(self.rpc_url, json=payload) response.raise_for_status() data = response.json() if "error" in data and data["error"]: error_info = data["error"] error_code = error_info.get("code", "unknown") error_msg = error_info.get("message", str(error_info)) raise ValueError(f"RPC error {error_code}: {error_msg}") return data.get("result") except httpx.TimeoutException as e: logger.error(f"RPC call timed out: {method} - {e}") raise except httpx.HTTPError as e: logger.error(f"RPC call failed: {method} - {e}") raise async def _scantxoutset_with_retry( self, descriptors: Sequence[str | dict[str, Any]] ) -> dict[str, Any] | None: """ Execute scantxoutset with retry logic for handling concurrent scan conflicts. Bitcoin Core only allows one scantxoutset at a time. This method: 1. Checks if a scan is already in progress 2. If so, waits for it to complete (via status polling) before starting ours 3. Starts our scan with extended timeout for mainnet Args: descriptors: List of output descriptors to scan for. Can be: - Simple strings: "addr(bc1q...)" - Dicts with range: {"desc": "wpkh([fp/84'/0'/0'/0/*)", "range": [0, 999]} Returns: Scan result dict or None if all retries failed """ for attempt in range(SCAN_MAX_RETRIES): try: # First check if a scan is already running status = await self._rpc_call("scantxoutset", ["status"]) if status is not None: # A scan is in progress - wait for it # Bitcoin Core returns progress as 0-100, not 0-1 progress = status.get("progress", 0) / 100.0 logger.debug( f"Another scan in progress ({progress:.1%}), waiting... " f"(attempt {attempt + 1}/{SCAN_MAX_RETRIES})" ) if attempt < SCAN_MAX_RETRIES - 1: await asyncio.sleep(SCAN_STATUS_POLL_INTERVAL) continue # Start our scan with extended timeout logger.debug(f"Starting UTXO scan for {len(descriptors)} descriptor(s)...") if SENSITIVE_LOGGING: logger.debug(f"Descriptors for scan: {descriptors}") result = await self._rpc_call( "scantxoutset", ["start", descriptors], client=self._scan_client ) if result: unspent_count = len(result.get("unspents", [])) total_amount = result.get("total_amount", 0) logger.debug( f"Scan completed: found {unspent_count} UTXOs, total {total_amount:.8f} BTC" ) if SENSITIVE_LOGGING and unspent_count > 0: logger.debug(f"Scan result: {result}") return result except ValueError as e: error_str = str(e) # Check for "scan already in progress" error (code -8) if "code': -8" in error_str or "Scan already in progress" in error_str: if attempt < SCAN_MAX_RETRIES - 1: delay = SCAN_BASE_DELAY * (2**attempt) + random.uniform(0, 0.5) logger.debug( f"Scan in progress (RPC error), retrying in {delay:.2f}s " f"(attempt {attempt + 1}/{SCAN_MAX_RETRIES})" ) await asyncio.sleep(delay) continue else: logger.warning( f"Max retries ({SCAN_MAX_RETRIES}) exceeded waiting for scan slot" ) return None else: # Other RPC errors - log and re-raise logger.error(f"scantxoutset RPC error: {error_str}") raise except httpx.TimeoutException: # Timeout during scan - this is a real failure on mainnet logger.error( f"scantxoutset timed out after {self.scan_timeout}s. " "Try increasing scan_timeout for mainnet." ) return None except Exception as e: logger.error(f"Unexpected error during scantxoutset: {type(e).__name__}: {e}") raise logger.warning(f"scantxoutset failed after {SCAN_MAX_RETRIES} attempts") return None async def get_utxos(self, addresses: list[str]) -> list[UTXO]: utxos: list[UTXO] = [] if not addresses: return utxos # Get tip height once for confirmation calculation try: tip_height = await self.get_block_height() except Exception as e: logger.error(f"Failed to get block height for UTXO scan: {e}") return utxos # Process in batches to avoid huge RPC requests batch_size = 100 for i in range(0, len(addresses), batch_size): chunk = addresses[i : i + batch_size] descriptors = [f"addr({addr})" for addr in chunk] if SENSITIVE_LOGGING: logger.debug(f"Scanning addresses batch {i // batch_size + 1}: {chunk}") try: # Scan for all addresses in this chunk at once (with retry for conflicts) result = await self._scantxoutset_with_retry(descriptors) if not result or "unspents" not in result: continue for utxo_data in result["unspents"]: confirmations = 0 if utxo_data.get("height", 0) > 0: confirmations = tip_height - utxo_data["height"] + 1 # Extract address from descriptor "addr(ADDRESS)#checksum" or "addr(ADDRESS)" desc = utxo_data.get("desc", "") # Remove checksum if present if "#" in desc: desc = desc.split("#")[0] address = "" if desc.startswith("addr(") and desc.endswith(")"): address = desc[5:-1] else: # Only log warning if we really can't parse it (and it's not empty) if desc: logger.warning(f"Failed to parse address from descriptor: '{desc}'") utxo = UTXO( txid=utxo_data["txid"], vout=utxo_data["vout"], value=btc_to_sats(utxo_data["amount"]), address=address, confirmations=confirmations, scriptpubkey=utxo_data.get("scriptPubKey", ""), height=utxo_data.get("height"), ) utxos.append(utxo) logger.debug( f"Scanned {len(chunk)} addresses, found {len(result['unspents'])} UTXOs" ) except Exception as e: logger.warning(f"Failed to scan UTXOs for batch starting {chunk[0]}: {e}") continue return utxos async def scan_descriptors( self, descriptors: Sequence[str | dict[str, Any]] ) -> dict[str, Any] | None: """ Scan the UTXO set using output descriptors. This is much more efficient than scanning individual addresses, especially for HD wallets where you can use xpub descriptors with ranges to scan thousands of addresses in a single UTXO set pass. Example descriptors: - "addr(bc1q...)" - single address - "wpkh(xpub.../0/*)" - HD wallet external addresses (default range 0-1000) - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range Args: descriptors: List of output descriptors (strings or dicts with range) Returns: Raw scan result dict from Bitcoin Core, or None on failure. Result includes: - success: bool - txouts: number of UTXOs scanned - height: current block height - unspents: list of found UTXOs with txid, vout, scriptPubKey, desc (matched descriptor), amount, height - total_amount: sum of all found UTXOs """ if not descriptors: return {"success": True, "unspents": [], "total_amount": 0} logger.info(f"Starting descriptor scan with {len(descriptors)} descriptor(s)...") result = await self._scantxoutset_with_retry(descriptors) if result: unspent_count = len(result.get("unspents", [])) total = result.get("total_amount", 0) logger.info( f"Descriptor scan complete: found {unspent_count} UTXOs, total {total:.8f} BTC" ) else: logger.warning("Descriptor scan failed or returned no results") return result async def get_address_balance(self, address: str) -> int: utxos = await self.get_utxos([address]) balance = sum(utxo.value for utxo in utxos) logger.debug(f"Balance for {address}: {balance} sats") return balance async def broadcast_transaction(self, tx_hex: str) -> str: try: txid = await self._rpc_call("sendrawtransaction", [tx_hex]) logger.info(f"Broadcast transaction: {txid}") return txid except Exception as e: logger.error(f"Failed to broadcast transaction: {e}") raise ValueError(f"Broadcast failed: {e}") from e async def get_transaction(self, txid: str) -> Transaction | None: try: tx_data = await self._rpc_call("getrawtransaction", [txid, True]) if not tx_data: return None confirmations = tx_data.get("confirmations", 0) block_height = None block_time = None if "blockhash" in tx_data: block_info = await self._rpc_call("getblockheader", [tx_data["blockhash"]]) block_height = block_info.get("height") block_time = block_info.get("time") raw_hex = tx_data.get("hex", "") return Transaction( txid=txid, raw=raw_hex, confirmations=confirmations, block_height=block_height, block_time=block_time, ) except Exception as e: logger.warning(f"Failed to fetch transaction {txid}: {e}") return None async def estimate_fee(self, target_blocks: int) -> int: try: result = await self._rpc_call("estimatesmartfee", [target_blocks]) if "feerate" in result: btc_per_kb = result["feerate"] # Convert BTC/kB to sat/vB sat_per_vbyte = round(btc_to_sats(btc_per_kb) / 1000) logger.debug(f"Estimated fee for {target_blocks} blocks: {sat_per_vbyte} sat/vB") return sat_per_vbyte else: logger.warning("Fee estimation unavailable, using fallback") return 10 except Exception as e: logger.warning(f"Failed to estimate fee: {e}, using fallback") return 10 async def get_block_height(self) -> int: try: info = await self._rpc_call("getblockchaininfo", []) height = info.get("blocks", 0) logger.debug(f"Current block height: {height}") return height except Exception as e: logger.error(f"Failed to fetch block height: {e}") raise async def get_block_time(self, block_height: int) -> int: try: block_hash = await self.get_block_hash(block_height) block_header = await self._rpc_call("getblockheader", [block_hash]) timestamp = block_header.get("time", 0) logger.debug(f"Block {block_height} timestamp: {timestamp}") return timestamp except Exception as e: logger.error(f"Failed to fetch block time for height {block_height}: {e}") raise async def get_block_hash(self, block_height: int) -> str: try: block_hash = await self._rpc_call("getblockhash", [block_height]) logger.debug(f"Block hash for height {block_height}: {block_hash}") return block_hash except Exception as e: logger.error(f"Failed to fetch block hash for height {block_height}: {e}") raise async def get_utxo(self, txid: str, vout: int) -> UTXO | None: """Get a specific UTXO from the blockchain UTXO set using gettxout. Returns None if the UTXO does not exist or has been spent. If not found in confirmed UTXO set, checks mempool for unconfirmed transactions. """ try: # gettxout returns None if UTXO doesn't exist or is spent # include_mempool=True checks both confirmed and unconfirmed outputs result = await self._rpc_call("gettxout", [txid, vout, True]) if result is None: # Not found in UTXO set - check if it's in mempool (unconfirmed) logger.debug( f"UTXO {txid}:{vout} not found in confirmed UTXO set, checking mempool..." ) try: # Get raw transaction from mempool tx_data = await self._rpc_call("getrawtransaction", [txid, True]) if tx_data and "vout" in tx_data: # Check if the vout exists and hasn't been spent if vout < len(tx_data["vout"]): vout_data = tx_data["vout"][vout] value = btc_to_sats(vout_data.get("value", 0)) # Extract address from scriptPubKey script_pub_key = vout_data.get("scriptPubKey", {}) address = script_pub_key.get("address", "") # For multiple addresses (e.g., multisig), join them if not address and "addresses" in script_pub_key: addresses = script_pub_key.get("addresses", []) address = addresses[0] if addresses else "" scriptpubkey = script_pub_key.get("hex", "") # Unconfirmed transaction has 0 confirmations logger.info(f"Found UTXO {txid}:{vout} in mempool (unconfirmed)") return UTXO( txid=txid, vout=vout, value=value, address=address, confirmations=0, scriptpubkey=scriptpubkey, height=None, ) except Exception as mempool_err: logger.debug(f"UTXO {txid}:{vout} not in mempool either: {mempool_err}") logger.debug(f"UTXO {txid}:{vout} not found (spent or doesn't exist)") return None # Get tip height for confirmation calculation tip_height = await self.get_block_height() confirmations = result.get("confirmations", 0) value = btc_to_sats(result.get("value", 0)) # BTC to sats # Extract address from scriptPubKey script_pub_key = result.get("scriptPubKey", {}) address = script_pub_key.get("address", "") scriptpubkey = script_pub_key.get("hex", "") # Calculate height from confirmations height = None if confirmations > 0: height = tip_height - confirmations + 1 return UTXO( txid=txid, vout=vout, value=value, address=address, confirmations=confirmations, scriptpubkey=scriptpubkey, height=height, ) except Exception as e: logger.error(f"Failed to get UTXO {txid}:{vout}: {e}") return None def can_provide_neutrino_metadata(self) -> bool: """ Bitcoin Core can provide Neutrino-compatible metadata. Full node can access scriptpubkey and blockheight for all UTXOs, allowing Neutrino takers to use our makers. Returns: True - Bitcoin Core always provides extended UTXO metadata """ return True async def close(self) -> None: await self.client.aclose() await self._scan_client.aclose()Blockchain backend using Bitcoin Core RPC. Does NOT use Bitcoin Core wallet (avoids BDB issues). Uses scantxoutset and other non-wallet RPC methods.
Ancestors
- BlockchainBackend
- abc.ABC
Methods
def can_provide_neutrino_metadata(self) ‑> bool-
Expand source code
def can_provide_neutrino_metadata(self) -> bool: """ Bitcoin Core can provide Neutrino-compatible metadata. Full node can access scriptpubkey and blockheight for all UTXOs, allowing Neutrino takers to use our makers. Returns: True - Bitcoin Core always provides extended UTXO metadata """ return TrueBitcoin Core can provide Neutrino-compatible metadata.
Full node can access scriptpubkey and blockheight for all UTXOs, allowing Neutrino takers to use our makers.
Returns
True - Bitcoin Core always provides extended UTXO metadata
async def get_utxo(self, txid: str, vout: int) ‑> UTXO | None-
Expand source code
async def get_utxo(self, txid: str, vout: int) -> UTXO | None: """Get a specific UTXO from the blockchain UTXO set using gettxout. Returns None if the UTXO does not exist or has been spent. If not found in confirmed UTXO set, checks mempool for unconfirmed transactions. """ try: # gettxout returns None if UTXO doesn't exist or is spent # include_mempool=True checks both confirmed and unconfirmed outputs result = await self._rpc_call("gettxout", [txid, vout, True]) if result is None: # Not found in UTXO set - check if it's in mempool (unconfirmed) logger.debug( f"UTXO {txid}:{vout} not found in confirmed UTXO set, checking mempool..." ) try: # Get raw transaction from mempool tx_data = await self._rpc_call("getrawtransaction", [txid, True]) if tx_data and "vout" in tx_data: # Check if the vout exists and hasn't been spent if vout < len(tx_data["vout"]): vout_data = tx_data["vout"][vout] value = btc_to_sats(vout_data.get("value", 0)) # Extract address from scriptPubKey script_pub_key = vout_data.get("scriptPubKey", {}) address = script_pub_key.get("address", "") # For multiple addresses (e.g., multisig), join them if not address and "addresses" in script_pub_key: addresses = script_pub_key.get("addresses", []) address = addresses[0] if addresses else "" scriptpubkey = script_pub_key.get("hex", "") # Unconfirmed transaction has 0 confirmations logger.info(f"Found UTXO {txid}:{vout} in mempool (unconfirmed)") return UTXO( txid=txid, vout=vout, value=value, address=address, confirmations=0, scriptpubkey=scriptpubkey, height=None, ) except Exception as mempool_err: logger.debug(f"UTXO {txid}:{vout} not in mempool either: {mempool_err}") logger.debug(f"UTXO {txid}:{vout} not found (spent or doesn't exist)") return None # Get tip height for confirmation calculation tip_height = await self.get_block_height() confirmations = result.get("confirmations", 0) value = btc_to_sats(result.get("value", 0)) # BTC to sats # Extract address from scriptPubKey script_pub_key = result.get("scriptPubKey", {}) address = script_pub_key.get("address", "") scriptpubkey = script_pub_key.get("hex", "") # Calculate height from confirmations height = None if confirmations > 0: height = tip_height - confirmations + 1 return UTXO( txid=txid, vout=vout, value=value, address=address, confirmations=confirmations, scriptpubkey=scriptpubkey, height=height, ) except Exception as e: logger.error(f"Failed to get UTXO {txid}:{vout}: {e}") return NoneGet a specific UTXO from the blockchain UTXO set using gettxout. Returns None if the UTXO does not exist or has been spent.
If not found in confirmed UTXO set, checks mempool for unconfirmed transactions.
async def scan_descriptors(self, descriptors: Sequence[str | dict[str, Any]]) ‑> dict[str, typing.Any] | None-
Expand source code
async def scan_descriptors( self, descriptors: Sequence[str | dict[str, Any]] ) -> dict[str, Any] | None: """ Scan the UTXO set using output descriptors. This is much more efficient than scanning individual addresses, especially for HD wallets where you can use xpub descriptors with ranges to scan thousands of addresses in a single UTXO set pass. Example descriptors: - "addr(bc1q...)" - single address - "wpkh(xpub.../0/*)" - HD wallet external addresses (default range 0-1000) - {"desc": "wpkh(xpub.../0/*)", "range": [0, 999]} - explicit range Args: descriptors: List of output descriptors (strings or dicts with range) Returns: Raw scan result dict from Bitcoin Core, or None on failure. Result includes: - success: bool - txouts: number of UTXOs scanned - height: current block height - unspents: list of found UTXOs with txid, vout, scriptPubKey, desc (matched descriptor), amount, height - total_amount: sum of all found UTXOs """ if not descriptors: return {"success": True, "unspents": [], "total_amount": 0} logger.info(f"Starting descriptor scan with {len(descriptors)} descriptor(s)...") result = await self._scantxoutset_with_retry(descriptors) if result: unspent_count = len(result.get("unspents", [])) total = result.get("total_amount", 0) logger.info( f"Descriptor scan complete: found {unspent_count} UTXOs, total {total:.8f} BTC" ) else: logger.warning("Descriptor scan failed or returned no results") return resultScan the UTXO set using output descriptors.
This is much more efficient than scanning individual addresses, especially for HD wallets where you can use xpub descriptors with ranges to scan thousands of addresses in a single UTXO set pass.
Example descriptors: - "addr(bc1q…)" - single address - "wpkh(xpub…/0/)" - HD wallet external addresses (default range 0-1000) - {"desc": "wpkh(xpub…/0/)", "range": [0, 999]} - explicit range
Args
descriptors- List of output descriptors (strings or dicts with range)
Returns
Raw scan result dict from Bitcoin Core, or None on failure. Result includes: - success: bool - txouts: number of UTXOs scanned - height: current block height - unspents: list of found UTXOs with txid, vout, scriptPubKey, desc (matched descriptor), amount, height - total_amount: sum of all found UTXOs
Inherited members