Module maker.fidelity
Fidelity bond utilities for maker bot.
Functions
def create_fidelity_bond_proof(bond: FidelityBondInfo,
maker_nick: str,
taker_nick: str,
current_block_height: int) ‑> str | None-
Expand source code
def create_fidelity_bond_proof( bond: FidelityBondInfo, maker_nick: str, taker_nick: str, current_block_height: int, ) -> str | None: """ Create a fidelity bond proof for broadcasting. The proof structure (252 bytes total): - 72 bytes: Nick signature (signs "taker_nick|maker_nick" with Bitcoin message format) - 72 bytes: Certificate signature (signs cert message with Bitcoin message format) - 33 bytes: Certificate public key (hot wallet key or same as utxo_pub for self-signed) - 2 bytes: Certificate expiry (retarget period number when cert becomes invalid) - 33 bytes: UTXO public key (cold wallet key) - 32 bytes: TXID (little-endian) - 4 bytes: Vout (little-endian) - 4 bytes: Locktime (little-endian) Nick signature message format: (taker_nick + '|' + maker_nick).encode('ascii') Certificate signature message format (binary): b'fidelity-bond-cert|' + cert_pub + b'|' + str(cert_expiry_encoded).encode('ascii') Both signatures use Bitcoin message signing format (double SHA256 with prefix). This function supports two modes: 1. **Self-signed mode** (hot wallet): bond.private_key is available, signs everything 2. **Certificate mode** (cold wallet): bond.cert_* fields are set, uses pre-signed cert Args: bond: FidelityBondInfo with UTXO details and either private key or certificate maker_nick: Maker's JoinMarket nick taker_nick: Target taker's nick (for ownership proof) current_block_height: Current blockchain height (for calculating cert expiry) Returns: Base64-encoded proof string, or None if signing fails """ if not bond.pubkey: logger.error("Bond missing pubkey") return None try: # Determine if we're using a certificate (cold wallet) or self-signing (hot wallet) use_certificate = ( bond.cert_pubkey is not None and bond.cert_privkey is not None and bond.cert_signature is not None and bond.cert_expiry is not None ) if use_certificate: # COLD WALLET MODE: Use pre-signed certificate # These assertions are safe because use_certificate already checks they're not None assert bond.cert_pubkey is not None assert bond.cert_signature is not None assert bond.cert_expiry is not None assert bond.cert_privkey is not None cert_pub = bond.cert_pubkey cert_sig = bond.cert_signature cert_expiry_encoded = bond.cert_expiry utxo_pub = bond.pubkey logger.debug( f"Using certificate mode for bond proof (cert_expiry={cert_expiry_encoded})" ) # Sign nick message with hot wallet cert_privkey nick_msg = (taker_nick + "|" + maker_nick).encode("ascii") nick_sig = _sign_message_bitcoin(bond.cert_privkey, nick_msg) nick_sig_padded = _pad_signature(nick_sig, 72) # Use pre-signed certificate signature cert_sig_padded = _pad_signature(cert_sig, 72) else: # HOT WALLET MODE (SELF-SIGNED): traditional single-key mode if not bond.private_key: logger.error("Bond missing private key (required for self-signed mode)") return None cert_pub = bond.pubkey utxo_pub = bond.pubkey # Calculate certificate expiry as retarget period number # Reference: yieldgenerator.py line 139 # cert_expiry = # ((blocks + BLOCK_COUNT_SAFETY) // RETARGET_INTERVAL) + CERT_MAX_VALIDITY_TIME cert_expiry_encoded = ( (current_block_height + BLOCK_COUNT_SAFETY) // RETARGET_INTERVAL ) + CERT_MAX_VALIDITY_TIME logger.debug( f"Using self-signed mode for bond proof (cert_expiry={cert_expiry_encoded})" ) # 1. Nick signature: proves the maker controls the certificate key # Signs "(taker_nick|maker_nick)" using Bitcoin message format nick_msg = (taker_nick + "|" + maker_nick).encode("ascii") nick_sig = _sign_message_bitcoin(bond.private_key, nick_msg) nick_sig_padded = _pad_signature(nick_sig, 72) # 2. Certificate signature: self-signed certificate # Signs "fidelity-bond-cert|<cert_pub>|<cert_expiry_encoded>" cert_msg = ( b"fidelity-bond-cert|" + cert_pub + b"|" + str(cert_expiry_encoded).encode("ascii") ) cert_sig = _sign_message_bitcoin(bond.private_key, cert_msg) cert_sig_padded = _pad_signature(cert_sig, 72) # 3. Pack the proof # TXID in display format (big-endian, human-readable) - same as how Bitcoin Core # returns txids and how the reference implementation stores them. # Reference: wallet.py line 754 uses tx.GetTxid()[::-1] which converts from # internal (little-endian) to display (big-endian) format. txid_bytes = bytes.fromhex(bond.txid) if len(txid_bytes) != 32: raise ValueError(f"Invalid txid length: {len(txid_bytes)}") proof_data = struct.pack( "<72s72s33sH33s32sII", nick_sig_padded, cert_sig_padded, cert_pub, cert_expiry_encoded, utxo_pub, txid_bytes, bond.vout, bond.locktime, ) if len(proof_data) != 252: raise ValueError(f"Invalid proof length: {len(proof_data)}, expected 252") return base64.b64encode(proof_data).decode("ascii") except Exception as e: logger.error(f"Failed to create bond proof: {e}") return NoneCreate a fidelity bond proof for broadcasting.
The proof structure (252 bytes total): - 72 bytes: Nick signature (signs "taker_nick|maker_nick" with Bitcoin message format) - 72 bytes: Certificate signature (signs cert message with Bitcoin message format) - 33 bytes: Certificate public key (hot wallet key or same as utxo_pub for self-signed) - 2 bytes: Certificate expiry (retarget period number when cert becomes invalid) - 33 bytes: UTXO public key (cold wallet key) - 32 bytes: TXID (little-endian) - 4 bytes: Vout (little-endian) - 4 bytes: Locktime (little-endian)
Nick signature message format: (taker_nick + '|' + maker_nick).encode('ascii')
Certificate signature message format (binary): b'fidelity-bond-cert|' + cert_pub + b'|' + str(cert_expiry_encoded).encode('ascii')
Both signatures use Bitcoin message signing format (double SHA256 with prefix).
This function supports two modes: 1. Self-signed mode (hot wallet): bond.private_key is available, signs everything 2. Certificate mode (cold wallet): bond.cert_* fields are set, uses pre-signed cert
Args
bond- FidelityBondInfo with UTXO details and either private key or certificate
maker_nick- Maker's JoinMarket nick
taker_nick- Target taker's nick (for ownership proof)
current_block_height- Current blockchain height (for calculating cert expiry)
Returns
Base64-encoded proof string, or None if signing fails
async def find_fidelity_bonds(wallet: WalletService, mixdepth: int = 0) ‑> list[FidelityBondInfo]-
Expand source code
async def find_fidelity_bonds( wallet: WalletService, mixdepth: int = FIDELITY_BOND_MIXDEPTH ) -> list[FidelityBondInfo]: """ Find fidelity bonds in the wallet. Fidelity bonds are timelocked UTXOs in mixdepth 0, internal branch 2. Path format: m/84'/coin'/0'/2/index:locktime They use a CLTV script: <locktime> OP_CLTV OP_DROP <pubkey> OP_CHECKSIG This function also loads certificate information from the bond registry if available, allowing for cold wallet support where the bond UTXO private key is kept offline. Args: wallet: WalletService instance mixdepth: Mixdepth to search for bonds (default 0) Returns: List of FidelityBondInfo for each bond found """ bonds: list[FidelityBondInfo] = [] # Try to load bond registry for certificate information from jmwallet.wallet.bond_registry import BondRegistry registry: BondRegistry | None = None try: from pathlib import Path from jmcore.paths import get_default_data_dir from jmwallet.wallet.bond_registry import load_registry data_dir = get_default_data_dir() registry = load_registry(Path(data_dir)) logger.debug(f"Loaded bond registry with {len(registry.bonds)} bonds") except Exception as e: logger.debug(f"Could not load bond registry: {e}") utxos = wallet.utxo_cache.get(mixdepth, []) if not utxos: return bonds for utxo_info in utxos: # Fidelity bonds are on internal branch 2 with locktime in path # Path format: m/84'/coin'/0'/2/index:locktime path_parts = utxo_info.path.split("/") if len(path_parts) < 5: continue # Check if this is internal branch 2 (fidelity bond branch) # path_parts[-2] is the branch (0=external, 1=internal change, 2=fidelity bonds) branch_part = path_parts[-2] if branch_part != str(FIDELITY_BOND_INTERNAL_BRANCH): continue # Extract locktime from path (format: index:locktime) locktime = _parse_locktime_from_path(utxo_info.path) if locktime is None: # Not a timelocked UTXO continue # Check if this is an external bond (index=-1 indicates cold storage) # External bonds have path format: m/84'/0'/0'/2/-1:locktime index_locktime = path_parts[-1] # e.g., "-1:1769904000" or "0:1768435200" index_str = index_locktime.split(":")[0] is_external_bond = index_str == "-1" pubkey: bytes | None = None private_key: PrivateKey | None = None # Check registry for bond info (needed for both external bonds and certificates) registry_bond = None if registry is not None: registry_bond = registry.get_bond_by_address(utxo_info.address) if is_external_bond: # External bond: get pubkey from registry, no private key available if registry_bond: try: pubkey = bytes.fromhex(registry_bond.pubkey) logger.debug( f"External bond {utxo_info.address[:20]}... using pubkey from registry" ) except Exception as e: logger.warning( f"Failed to get pubkey for external bond {utxo_info.address}: {e}" ) if pubkey is None: logger.warning( f"External bond {utxo_info.address[:20]}... not found in registry, skipping" ) continue else: # Hot wallet bond: derive key from wallet key = wallet.get_key_for_address(utxo_info.address) pubkey = key.get_public_key_bytes(compressed=True) if key else None private_key = key.private_key if key else None # Get confirmation_time (Unix timestamp) from block height # For unconfirmed UTXOs (height=None), we can't calculate bond value yet if utxo_info.height is None: logger.warning(f"Skipping unconfirmed bond UTXO {utxo_info.txid}:{utxo_info.vout}") continue confirmation_time = await wallet.backend.get_block_time(utxo_info.height) bond_value = calculate_timelocked_fidelity_bond_value( utxo_value=utxo_info.value, confirmation_time=confirmation_time, locktime=locktime, ) # Check registry for certificate information (cold wallet support) cert_pubkey: bytes | None = None cert_privkey: PrivateKey | None = None cert_signature: bytes | None = None cert_expiry: int | None = None if registry_bond is not None and registry_bond.has_certificate: try: cert_pubkey = bytes.fromhex(registry_bond.cert_pubkey) # type: ignore cert_privkey = PrivateKey( bytes.fromhex(registry_bond.cert_privkey) # type: ignore ) cert_signature = bytes.fromhex(registry_bond.cert_signature) # type: ignore cert_expiry = registry_bond.cert_expiry logger.debug( f"Found certificate for bond {utxo_info.address[:20]}... " f"(expiry: {cert_expiry} periods)" ) except Exception as e: logger.warning(f"Failed to parse certificate for {utxo_info.address}: {e}") bonds.append( FidelityBondInfo( txid=utxo_info.txid, vout=utxo_info.vout, value=utxo_info.value, locktime=locktime, confirmation_time=confirmation_time, bond_value=bond_value, pubkey=pubkey, private_key=private_key, cert_pubkey=cert_pubkey, cert_privkey=cert_privkey, cert_signature=cert_signature, cert_expiry=cert_expiry, ) ) return bondsFind fidelity bonds in the wallet.
Fidelity bonds are timelocked UTXOs in mixdepth 0, internal branch 2. Path format: m/84'/coin'/0'/2/index:locktime They use a CLTV script:
OP_CLTV OP_DROP OP_CHECKSIG This function also loads certificate information from the bond registry if available, allowing for cold wallet support where the bond UTXO private key is kept offline.
Args
wallet- WalletService instance
mixdepth- Mixdepth to search for bonds (default 0)
Returns
List of FidelityBondInfo for each bond found
async def get_best_fidelity_bond(wallet: WalletService, mixdepth: int = 0) ‑> FidelityBondInfo | None-
Expand source code
async def get_best_fidelity_bond( wallet: WalletService, mixdepth: int = FIDELITY_BOND_MIXDEPTH ) -> FidelityBondInfo | None: """ Get the best (highest value) fidelity bond from the wallet. Args: wallet: WalletService instance mixdepth: Mixdepth to search Returns: Best FidelityBondInfo or None if no bonds found """ bonds = await find_fidelity_bonds(wallet, mixdepth) if not bonds: return None return max(bonds, key=lambda b: b.bond_value)Get the best (highest value) fidelity bond from the wallet.
Args
wallet- WalletService instance
mixdepth- Mixdepth to search
Returns
Best FidelityBondInfo or None if no bonds found
Classes
class FidelityBondInfo (*args: Any, **kwargs: Any)-
Expand source code
@dataclass(config=ConfigDict(arbitrary_types_allowed=True)) class FidelityBondInfo: txid: str vout: int value: int locktime: int confirmation_time: int bond_value: int pubkey: bytes | None = None private_key: PrivateKey | None = None # Certificate fields (for cold wallet support) # When set, the bond proof uses the certificate chain instead of self-signing cert_pubkey: bytes | None = None # Hot wallet certificate public key cert_privkey: PrivateKey | None = None # Hot wallet private key for signing nicks cert_signature: bytes | None = None # Certificate signature by UTXO key cert_expiry: int | None = None # Certificate expiry in 2016-block periodsInstance variables
var bond_value : int-
The type of the None singleton.
var cert_expiry : int | None-
The type of the None singleton.
var cert_privkey : coincurve.keys.PrivateKey | None-
The type of the None singleton.
var cert_pubkey : bytes | None-
The type of the None singleton.
var cert_signature : bytes | None-
The type of the None singleton.
var confirmation_time : int-
The type of the None singleton.
var locktime : int-
The type of the None singleton.
var private_key : coincurve.keys.PrivateKey | None-
The type of the None singleton.
var pubkey : bytes | None-
The type of the None singleton.
var txid : str-
The type of the None singleton.
var value : int-
The type of the None singleton.
var vout : int-
The type of the None singleton.