Module jmwallet.wallet

Wallet functionality for JoinMarket.

Sub-modules

jmwallet.wallet.address

Bitcoin address generation utilities …

jmwallet.wallet.bip32

BIP32 HD key derivation for JoinMarket wallets. Implements BIP84 (Native SegWit) derivation paths.

jmwallet.wallet.bond_registry

Fidelity bond registry for persistent storage of bond metadata …

jmwallet.wallet.models

Wallet data models.

jmwallet.wallet.service

JoinMarket wallet service with mixdepth support.

jmwallet.wallet.signing

Bitcoin transaction signing utilities for P2WPKH and P2WSH inputs.

Functions

def create_bond_info(address: str,
locktime: int,
index: int,
path: str,
pubkey_hex: str,
witness_script: bytes,
network: str) ‑> FidelityBondInfo
Expand source code
def create_bond_info(
    address: str,
    locktime: int,
    index: int,
    path: str,
    pubkey_hex: str,
    witness_script: bytes,
    network: str,
) -> FidelityBondInfo:
    """
    Create a FidelityBondInfo instance.

    Args:
        address: The P2WSH address
        locktime: Unix timestamp locktime
        index: Derivation index
        path: Full derivation path
        pubkey_hex: Public key as hex
        witness_script: The witness script bytes
        network: Network name

    Returns:
        FidelityBondInfo instance
    """
    locktime_dt = datetime.fromtimestamp(locktime)
    return FidelityBondInfo(
        address=address,
        locktime=locktime,
        locktime_human=locktime_dt.strftime("%Y-%m-%d %H:%M:%S"),
        index=index,
        path=path,
        pubkey=pubkey_hex,
        witness_script_hex=witness_script.hex(),
        network=network,
        created_at=datetime.now().isoformat(),
    )

Create a FidelityBondInfo instance.

Args

address
The P2WSH address
locktime
Unix timestamp locktime
index
Derivation index
path
Full derivation path
pubkey_hex
Public key as hex
witness_script
The witness script bytes
network
Network name

Returns

FidelityBondInfo instance

def load_registry(data_dir: Path) ‑> BondRegistry
Expand source code
def load_registry(data_dir: Path) -> BondRegistry:
    """
    Load the bond registry from disk.

    Args:
        data_dir: Data directory path

    Returns:
        BondRegistry instance (empty if file doesn't exist)
    """
    registry_path = get_registry_path(data_dir)
    if not registry_path.exists():
        return BondRegistry()

    try:
        data = json.loads(registry_path.read_text())
        return BondRegistry.model_validate(data)
    except Exception as e:
        logger.error(f"Failed to load bond registry: {e}")
        # Return empty registry on error, but don't overwrite the file
        return BondRegistry()

Load the bond registry from disk.

Args

data_dir
Data directory path

Returns

BondRegistry instance (empty if file doesn't exist)

def save_registry(registry: BondRegistry,
data_dir: Path) ‑> None
Expand source code
def save_registry(registry: BondRegistry, data_dir: Path) -> None:
    """
    Save the bond registry to disk.

    Args:
        registry: BondRegistry instance
        data_dir: Data directory path
    """
    registry_path = get_registry_path(data_dir)
    registry_path.parent.mkdir(parents=True, exist_ok=True)

    try:
        registry_path.write_text(registry.model_dump_json(indent=2))
        logger.debug(f"Saved bond registry to {registry_path}")
    except Exception as e:
        logger.error(f"Failed to save bond registry: {e}")
        raise

Save the bond registry to disk.

Args

registry
BondRegistry instance
data_dir
Data directory path

Classes

class BondRegistry (**data: Any)
Expand source code
class BondRegistry(BaseModel):
    """Registry of all fidelity bonds for a wallet."""

    version: int = 1
    bonds: list[FidelityBondInfo] = []

    def add_bond(self, bond: FidelityBondInfo) -> None:
        """Add a new bond to the registry."""
        # Check for duplicate address
        for existing in self.bonds:
            if existing.address == bond.address:
                logger.warning(f"Bond with address {bond.address} already exists, updating")
                self.bonds.remove(existing)
                break
        self.bonds.append(bond)

    def get_bond_by_address(self, address: str) -> FidelityBondInfo | None:
        """Get a bond by its address."""
        for bond in self.bonds:
            if bond.address == address:
                return bond
        return None

    def get_bond_by_index(self, index: int, locktime: int) -> FidelityBondInfo | None:
        """Get a bond by its index and locktime."""
        for bond in self.bonds:
            if bond.index == index and bond.locktime == locktime:
                return bond
        return None

    def get_funded_bonds(self) -> list[FidelityBondInfo]:
        """Get all funded bonds."""
        return [b for b in self.bonds if b.is_funded]

    def get_active_bonds(self) -> list[FidelityBondInfo]:
        """Get all funded bonds that are not yet expired."""
        return [b for b in self.bonds if b.is_funded and not b.is_expired]

    def get_best_bond(self) -> FidelityBondInfo | None:
        """
        Get the best bond for advertising.

        Selection criteria (in order):
        1. Must be funded
        2. Must not be expired
        3. Highest value wins
        4. If tied, longest locktime remaining wins
        """
        active = self.get_active_bonds()
        if not active:
            return None

        # Sort by value (descending), then by time_until_unlock (descending)
        active.sort(key=lambda b: (b.value or 0, b.time_until_unlock), reverse=True)
        return active[0]

    def update_utxo_info(
        self,
        address: str,
        txid: str,
        vout: int,
        value: int,
        confirmations: int,
    ) -> bool:
        """Update UTXO information for a bond."""
        bond = self.get_bond_by_address(address)
        if bond:
            bond.txid = txid
            bond.vout = vout
            bond.value = value
            bond.confirmations = confirmations
            return True
        return False

Registry of all fidelity bonds for a wallet.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var bonds : list[FidelityBondInfo]

The type of the None singleton.

var model_config

The type of the None singleton.

var version : int

The type of the None singleton.

Methods

def add_bond(self,
bond: FidelityBondInfo) ‑> None
Expand source code
def add_bond(self, bond: FidelityBondInfo) -> None:
    """Add a new bond to the registry."""
    # Check for duplicate address
    for existing in self.bonds:
        if existing.address == bond.address:
            logger.warning(f"Bond with address {bond.address} already exists, updating")
            self.bonds.remove(existing)
            break
    self.bonds.append(bond)

Add a new bond to the registry.

def get_active_bonds(self) ‑> list[FidelityBondInfo]
Expand source code
def get_active_bonds(self) -> list[FidelityBondInfo]:
    """Get all funded bonds that are not yet expired."""
    return [b for b in self.bonds if b.is_funded and not b.is_expired]

Get all funded bonds that are not yet expired.

def get_best_bond(self) ‑> FidelityBondInfo | None
Expand source code
def get_best_bond(self) -> FidelityBondInfo | None:
    """
    Get the best bond for advertising.

    Selection criteria (in order):
    1. Must be funded
    2. Must not be expired
    3. Highest value wins
    4. If tied, longest locktime remaining wins
    """
    active = self.get_active_bonds()
    if not active:
        return None

    # Sort by value (descending), then by time_until_unlock (descending)
    active.sort(key=lambda b: (b.value or 0, b.time_until_unlock), reverse=True)
    return active[0]

Get the best bond for advertising.

Selection criteria (in order): 1. Must be funded 2. Must not be expired 3. Highest value wins 4. If tied, longest locktime remaining wins

def get_bond_by_address(self, address: str) ‑> FidelityBondInfo | None
Expand source code
def get_bond_by_address(self, address: str) -> FidelityBondInfo | None:
    """Get a bond by its address."""
    for bond in self.bonds:
        if bond.address == address:
            return bond
    return None

Get a bond by its address.

def get_bond_by_index(self, index: int, locktime: int) ‑> FidelityBondInfo | None
Expand source code
def get_bond_by_index(self, index: int, locktime: int) -> FidelityBondInfo | None:
    """Get a bond by its index and locktime."""
    for bond in self.bonds:
        if bond.index == index and bond.locktime == locktime:
            return bond
    return None

Get a bond by its index and locktime.

def get_funded_bonds(self) ‑> list[FidelityBondInfo]
Expand source code
def get_funded_bonds(self) -> list[FidelityBondInfo]:
    """Get all funded bonds."""
    return [b for b in self.bonds if b.is_funded]

Get all funded bonds.

def update_utxo_info(self, address: str, txid: str, vout: int, value: int, confirmations: int) ‑> bool
Expand source code
def update_utxo_info(
    self,
    address: str,
    txid: str,
    vout: int,
    value: int,
    confirmations: int,
) -> bool:
    """Update UTXO information for a bond."""
    bond = self.get_bond_by_address(address)
    if bond:
        bond.txid = txid
        bond.vout = vout
        bond.value = value
        bond.confirmations = confirmations
        return True
    return False

Update UTXO information for a bond.

class CoinSelection (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class CoinSelection:
    """Result of coin selection"""

    utxos: list[UTXOInfo]
    total_value: int
    change_value: int
    fee: int

Result of coin selection

Instance variables

var change_value : int

The type of the None singleton.

var fee : int

The type of the None singleton.

var total_value : int

The type of the None singleton.

var utxos : list[UTXOInfo]

The type of the None singleton.

class FidelityBondInfo (**data: Any)
Expand source code
class FidelityBondInfo(BaseModel):
    """Information about a single fidelity bond."""

    address: str
    locktime: int
    locktime_human: str
    index: int
    path: str
    pubkey: str
    witness_script_hex: str
    network: str
    created_at: str
    # UTXO info (populated when bond is funded)
    txid: str | None = None
    vout: int | None = None
    value: int | None = None  # in satoshis
    confirmations: int | None = None

    @property
    def is_funded(self) -> bool:
        """Check if this bond has been funded."""
        return self.txid is not None and self.value is not None and self.value > 0

    @property
    def is_expired(self) -> bool:
        """Check if the locktime has passed."""
        import time

        return time.time() >= self.locktime

    @property
    def time_until_unlock(self) -> int:
        """Seconds until the bond can be unlocked. Returns 0 if already expired."""
        import time

        remaining = self.locktime - int(time.time())
        return max(0, remaining)

Information about a single fidelity bond.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var address : str

The type of the None singleton.

var confirmations : int | None

The type of the None singleton.

var created_at : str

The type of the None singleton.

var index : int

The type of the None singleton.

var locktime : int

The type of the None singleton.

var locktime_human : str

The type of the None singleton.

var model_config

The type of the None singleton.

var network : str

The type of the None singleton.

var path : str

The type of the None singleton.

var pubkey : str

The type of the None singleton.

var txid : str | None

The type of the None singleton.

var value : int | None

The type of the None singleton.

var vout : int | None

The type of the None singleton.

var witness_script_hex : str

The type of the None singleton.

Instance variables

prop is_expired : bool
Expand source code
@property
def is_expired(self) -> bool:
    """Check if the locktime has passed."""
    import time

    return time.time() >= self.locktime

Check if the locktime has passed.

prop is_funded : bool
Expand source code
@property
def is_funded(self) -> bool:
    """Check if this bond has been funded."""
    return self.txid is not None and self.value is not None and self.value > 0

Check if this bond has been funded.

prop time_until_unlock : int
Expand source code
@property
def time_until_unlock(self) -> int:
    """Seconds until the bond can be unlocked. Returns 0 if already expired."""
    import time

    remaining = self.locktime - int(time.time())
    return max(0, remaining)

Seconds until the bond can be unlocked. Returns 0 if already expired.

class HDKey (private_key: PrivateKey,
chain_code: bytes,
depth: int = 0,
parent_fingerprint: bytes = b'\x00\x00\x00\x00',
child_number: int = 0)
Expand source code
class HDKey:
    """
    Hierarchical Deterministic Key for Bitcoin.
    Implements BIP32 derivation.
    """

    def __init__(
        self,
        private_key: PrivateKey,
        chain_code: bytes,
        depth: int = 0,
        parent_fingerprint: bytes = b"\x00\x00\x00\x00",
        child_number: int = 0,
    ):
        self._private_key = private_key
        self._public_key = private_key.public_key
        self.chain_code = chain_code
        self.depth = depth
        self.parent_fingerprint = parent_fingerprint
        self.child_number = child_number

    @property
    def private_key(self) -> PrivateKey:
        """Return the coincurve PrivateKey instance."""
        return self._private_key

    @property
    def public_key(self) -> PublicKey:
        """Return the coincurve PublicKey instance."""
        return self._public_key

    @property
    def fingerprint(self) -> bytes:
        """Get the fingerprint of this key (first 4 bytes of hash160 of public key)."""
        pubkey_bytes = self._public_key.format(compressed=True)
        sha256_hash = hashlib.sha256(pubkey_bytes).digest()
        ripemd160_hash = hashlib.new("ripemd160", sha256_hash).digest()
        return ripemd160_hash[:4]

    @classmethod
    def from_seed(cls, seed: bytes) -> HDKey:
        """Create master HD key from seed"""
        hmac_result = hmac.new(b"Bitcoin seed", seed, hashlib.sha512).digest()
        key_bytes = hmac_result[:32]
        chain_code = hmac_result[32:]

        private_key = PrivateKey(key_bytes)

        return cls(private_key, chain_code, depth=0)

    def derive(self, path: str) -> HDKey:
        """
        Derive child key from path notation (e.g., "m/84'/0'/0'/0/0")
        ' indicates hardened derivation
        """
        if not path.startswith("m"):
            raise ValueError("Path must start with 'm'")

        parts = path.split("/")[1:]
        key = self

        for part in parts:
            if not part:
                continue

            hardened = part.endswith("'") or part.endswith("h")
            index_str = part.rstrip("'h")
            index = int(index_str)

            if hardened:
                index += 0x80000000

            key = key._derive_child(index)

        return key

    def _derive_child(self, index: int) -> HDKey:
        """Derive a child key at the given index"""
        hardened = index >= 0x80000000

        if hardened:
            priv_bytes = self._private_key.secret
            data = b"\x00" + priv_bytes + index.to_bytes(4, "big")
        else:
            pub_bytes = self._public_key.format(compressed=True)
            data = pub_bytes + index.to_bytes(4, "big")

        hmac_result = hmac.new(self.chain_code, data, hashlib.sha512).digest()
        key_offset = hmac_result[:32]
        child_chain = hmac_result[32:]

        parent_key_int = int.from_bytes(self._private_key.secret, "big")
        offset_int = int.from_bytes(key_offset, "big")

        child_key_int = (parent_key_int + offset_int) % SECP256K1_N

        if child_key_int == 0:
            raise ValueError("Invalid child key")

        child_key_bytes = child_key_int.to_bytes(32, "big")
        child_private_key = PrivateKey(child_key_bytes)

        return HDKey(
            child_private_key,
            child_chain,
            depth=self.depth + 1,
            parent_fingerprint=self.fingerprint,
            child_number=index,
        )

    def get_private_key_bytes(self) -> bytes:
        """Get private key as 32 bytes"""
        return self._private_key.secret

    def get_public_key_bytes(self, compressed: bool = True) -> bytes:
        """Get public key bytes"""
        return self._public_key.format(compressed=compressed)

    def get_address(self, network: str = "mainnet") -> str:
        """Get P2WPKH (Native SegWit) address for this key"""
        from jmwallet.wallet.address import pubkey_to_p2wpkh_address

        pubkey_hex = self.get_public_key_bytes(compressed=True).hex()
        return pubkey_to_p2wpkh_address(pubkey_hex, network)

    def sign(self, message: bytes) -> bytes:
        """Sign a message with this key (uses SHA256 hashing)."""
        return self._private_key.sign(message)

    def get_xpub(self, network: str = "mainnet") -> str:
        """
        Serialize the public key as an extended public key (xpub/tpub).

        This produces a standard BIP32 xpub that can be used in Bitcoin Core
        descriptors. The descriptor wrapper (wpkh, wsh, etc.) determines the
        actual address type.

        Args:
            network: "mainnet" for xpub, "testnet"/"regtest" for tpub

        Returns:
            Base58Check-encoded extended public key (xpub or tpub)
        """
        if network == "mainnet":
            version = XPUB_MAINNET
        else:
            version = XPUB_TESTNET

        # BIP32 serialization format:
        # 4 bytes: version
        # 1 byte: depth
        # 4 bytes: parent fingerprint
        # 4 bytes: child number
        # 32 bytes: chain code
        # 33 bytes: public key (compressed)
        depth_byte = min(self.depth, 255).to_bytes(1, "big")
        child_num_bytes = self.child_number.to_bytes(4, "big")
        pubkey_bytes = self._public_key.format(compressed=True)

        payload = (
            version
            + depth_byte
            + self.parent_fingerprint
            + child_num_bytes
            + self.chain_code
            + pubkey_bytes
        )

        return _base58check_encode(payload)

    def get_xprv(self, network: str = "mainnet") -> str:
        """
        Serialize the private key as an extended private key (xprv/tprv).

        Args:
            network: "mainnet" for xprv, "testnet"/"regtest" for tprv

        Returns:
            Base58Check-encoded extended private key
        """
        if network == "mainnet":
            version = XPRV_MAINNET
        else:
            version = XPRV_TESTNET

        depth_byte = min(self.depth, 255).to_bytes(1, "big")
        child_num_bytes = self.child_number.to_bytes(4, "big")
        # Private key is prefixed with 0x00 to make it 33 bytes
        privkey_bytes = b"\x00" + self._private_key.secret

        payload = (
            version
            + depth_byte
            + self.parent_fingerprint
            + child_num_bytes
            + self.chain_code
            + privkey_bytes
        )

        return _base58check_encode(payload)

Hierarchical Deterministic Key for Bitcoin. Implements BIP32 derivation.

Static methods

def from_seed(seed: bytes) ‑> HDKey

Create master HD key from seed

Instance variables

prop fingerprint : bytes
Expand source code
@property
def fingerprint(self) -> bytes:
    """Get the fingerprint of this key (first 4 bytes of hash160 of public key)."""
    pubkey_bytes = self._public_key.format(compressed=True)
    sha256_hash = hashlib.sha256(pubkey_bytes).digest()
    ripemd160_hash = hashlib.new("ripemd160", sha256_hash).digest()
    return ripemd160_hash[:4]

Get the fingerprint of this key (first 4 bytes of hash160 of public key).

prop private_key : PrivateKey
Expand source code
@property
def private_key(self) -> PrivateKey:
    """Return the coincurve PrivateKey instance."""
    return self._private_key

Return the coincurve PrivateKey instance.

prop public_key : PublicKey
Expand source code
@property
def public_key(self) -> PublicKey:
    """Return the coincurve PublicKey instance."""
    return self._public_key

Return the coincurve PublicKey instance.

Methods

def derive(self, path: str) ‑> HDKey
Expand source code
def derive(self, path: str) -> HDKey:
    """
    Derive child key from path notation (e.g., "m/84'/0'/0'/0/0")
    ' indicates hardened derivation
    """
    if not path.startswith("m"):
        raise ValueError("Path must start with 'm'")

    parts = path.split("/")[1:]
    key = self

    for part in parts:
        if not part:
            continue

        hardened = part.endswith("'") or part.endswith("h")
        index_str = part.rstrip("'h")
        index = int(index_str)

        if hardened:
            index += 0x80000000

        key = key._derive_child(index)

    return key

Derive child key from path notation (e.g., "m/84'/0'/0'/0/0") ' indicates hardened derivation

def get_address(self, network: str = 'mainnet') ‑> str
Expand source code
def get_address(self, network: str = "mainnet") -> str:
    """Get P2WPKH (Native SegWit) address for this key"""
    from jmwallet.wallet.address import pubkey_to_p2wpkh_address

    pubkey_hex = self.get_public_key_bytes(compressed=True).hex()
    return pubkey_to_p2wpkh_address(pubkey_hex, network)

Get P2WPKH (Native SegWit) address for this key

def get_private_key_bytes(self) ‑> bytes
Expand source code
def get_private_key_bytes(self) -> bytes:
    """Get private key as 32 bytes"""
    return self._private_key.secret

Get private key as 32 bytes

def get_public_key_bytes(self, compressed: bool = True) ‑> bytes
Expand source code
def get_public_key_bytes(self, compressed: bool = True) -> bytes:
    """Get public key bytes"""
    return self._public_key.format(compressed=compressed)

Get public key bytes

def get_xprv(self, network: str = 'mainnet') ‑> str
Expand source code
def get_xprv(self, network: str = "mainnet") -> str:
    """
    Serialize the private key as an extended private key (xprv/tprv).

    Args:
        network: "mainnet" for xprv, "testnet"/"regtest" for tprv

    Returns:
        Base58Check-encoded extended private key
    """
    if network == "mainnet":
        version = XPRV_MAINNET
    else:
        version = XPRV_TESTNET

    depth_byte = min(self.depth, 255).to_bytes(1, "big")
    child_num_bytes = self.child_number.to_bytes(4, "big")
    # Private key is prefixed with 0x00 to make it 33 bytes
    privkey_bytes = b"\x00" + self._private_key.secret

    payload = (
        version
        + depth_byte
        + self.parent_fingerprint
        + child_num_bytes
        + self.chain_code
        + privkey_bytes
    )

    return _base58check_encode(payload)

Serialize the private key as an extended private key (xprv/tprv).

Args

network
"mainnet" for xprv, "testnet"/"regtest" for tprv

Returns

Base58Check-encoded extended private key

def get_xpub(self, network: str = 'mainnet') ‑> str
Expand source code
def get_xpub(self, network: str = "mainnet") -> str:
    """
    Serialize the public key as an extended public key (xpub/tpub).

    This produces a standard BIP32 xpub that can be used in Bitcoin Core
    descriptors. The descriptor wrapper (wpkh, wsh, etc.) determines the
    actual address type.

    Args:
        network: "mainnet" for xpub, "testnet"/"regtest" for tpub

    Returns:
        Base58Check-encoded extended public key (xpub or tpub)
    """
    if network == "mainnet":
        version = XPUB_MAINNET
    else:
        version = XPUB_TESTNET

    # BIP32 serialization format:
    # 4 bytes: version
    # 1 byte: depth
    # 4 bytes: parent fingerprint
    # 4 bytes: child number
    # 32 bytes: chain code
    # 33 bytes: public key (compressed)
    depth_byte = min(self.depth, 255).to_bytes(1, "big")
    child_num_bytes = self.child_number.to_bytes(4, "big")
    pubkey_bytes = self._public_key.format(compressed=True)

    payload = (
        version
        + depth_byte
        + self.parent_fingerprint
        + child_num_bytes
        + self.chain_code
        + pubkey_bytes
    )

    return _base58check_encode(payload)

Serialize the public key as an extended public key (xpub/tpub).

This produces a standard BIP32 xpub that can be used in Bitcoin Core descriptors. The descriptor wrapper (wpkh, wsh, etc.) determines the actual address type.

Args

network
"mainnet" for xpub, "testnet"/"regtest" for tpub

Returns

Base58Check-encoded extended public key (xpub or tpub)

def sign(self, message: bytes) ‑> bytes
Expand source code
def sign(self, message: bytes) -> bytes:
    """Sign a message with this key (uses SHA256 hashing)."""
    return self._private_key.sign(message)

Sign a message with this key (uses SHA256 hashing).

class UTXOInfo (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class UTXOInfo:
    """Extended UTXO information with wallet context"""

    txid: str
    vout: int
    value: int
    address: str
    confirmations: int
    scriptpubkey: str
    path: str
    mixdepth: int
    height: int | None = None  # Block height where UTXO was confirmed (for Neutrino)
    locktime: int | None = None  # Locktime for fidelity bond UTXOs (None for regular UTXOs)

    @property
    def is_timelocked(self) -> bool:
        """Check if this is a timelocked (fidelity bond) UTXO."""
        return self.locktime is not None

    @property
    def is_p2wsh(self) -> bool:
        """Check if this UTXO is P2WSH based on scriptpubkey."""
        # P2WSH scriptpubkey: OP_0 (0x00) + PUSH32 (0x20) + 32-byte hash = 34 bytes (68 hex chars)
        if len(self.scriptpubkey) != 68:
            return False
        return self.scriptpubkey.startswith("0020")

    @property
    def is_p2wpkh(self) -> bool:
        """Check if this UTXO is P2WPKH based on scriptpubkey."""
        # P2WPKH scriptpubkey: OP_0 (0x00) + PUSH20 (0x14) + 20-byte hash = 22 bytes (44 hex chars)
        if len(self.scriptpubkey) != 44:
            return False
        return self.scriptpubkey.startswith("0014")

Extended UTXO information with wallet context

Instance variables

var address : str

The type of the None singleton.

var confirmations : int

The type of the None singleton.

var height : int | None

The type of the None singleton.

prop is_p2wpkh : bool
Expand source code
@property
def is_p2wpkh(self) -> bool:
    """Check if this UTXO is P2WPKH based on scriptpubkey."""
    # P2WPKH scriptpubkey: OP_0 (0x00) + PUSH20 (0x14) + 20-byte hash = 22 bytes (44 hex chars)
    if len(self.scriptpubkey) != 44:
        return False
    return self.scriptpubkey.startswith("0014")

Check if this UTXO is P2WPKH based on scriptpubkey.

prop is_p2wsh : bool
Expand source code
@property
def is_p2wsh(self) -> bool:
    """Check if this UTXO is P2WSH based on scriptpubkey."""
    # P2WSH scriptpubkey: OP_0 (0x00) + PUSH32 (0x20) + 32-byte hash = 34 bytes (68 hex chars)
    if len(self.scriptpubkey) != 68:
        return False
    return self.scriptpubkey.startswith("0020")

Check if this UTXO is P2WSH based on scriptpubkey.

prop is_timelocked : bool
Expand source code
@property
def is_timelocked(self) -> bool:
    """Check if this is a timelocked (fidelity bond) UTXO."""
    return self.locktime is not None

Check if this is a timelocked (fidelity bond) UTXO.

var locktime : int | None

The type of the None singleton.

var mixdepth : int

The type of the None singleton.

var path : str

The type of the None singleton.

var scriptpubkey : str

The type of the None singleton.

var txid : str

The type of the None singleton.

var value : int

The type of the None singleton.

var vout : int

The type of the None singleton.

class WalletService (mnemonic: str,
backend: BlockchainBackend,
network: str = 'mainnet',
mixdepth_count: int = 5,
gap_limit: int = 20,
data_dir: Path | None = None)
Expand source code
class WalletService:
    """
    JoinMarket wallet service.
    Manages BIP84 hierarchical deterministic wallet with mixdepths.

    Derivation path: m/84'/0'/{mixdepth}'/{change}/{index}
    - mixdepth: 0-4 (JoinMarket isolation levels)
    - change: 0 (external/receive), 1 (internal/change)
    - index: address index
    """

    def __init__(
        self,
        mnemonic: str,
        backend: BlockchainBackend,
        network: str = "mainnet",
        mixdepth_count: int = 5,
        gap_limit: int = 20,
        data_dir: Path | None = None,
    ):
        self.mnemonic = mnemonic
        self.backend = backend
        self.network = network
        self.mixdepth_count = mixdepth_count
        self.gap_limit = gap_limit
        self.data_dir = data_dir

        seed = mnemonic_to_seed(mnemonic)
        self.master_key = HDKey.from_seed(seed)

        coin_type = 0 if network == "mainnet" else 1
        self.root_path = f"m/84'/{coin_type}'"

        self.address_cache: dict[str, tuple[int, int, int]] = {}
        self.utxo_cache: dict[int, list[UTXOInfo]] = {}

        logger.info(f"Initialized wallet with {mixdepth_count} mixdepths")

    def get_address(self, mixdepth: int, change: int, index: int) -> str:
        """Get address for given path"""
        if mixdepth >= self.mixdepth_count:
            raise ValueError(f"Mixdepth {mixdepth} exceeds maximum {self.mixdepth_count}")

        path = f"{self.root_path}/{mixdepth}'/{change}/{index}"
        key = self.master_key.derive(path)
        address = key.get_address(self.network)

        self.address_cache[address] = (mixdepth, change, index)

        return address

    def get_receive_address(self, mixdepth: int, index: int) -> str:
        """Get external (receive) address"""
        return self.get_address(mixdepth, 0, index)

    def get_change_address(self, mixdepth: int, index: int) -> str:
        """Get internal (change) address"""
        return self.get_address(mixdepth, 1, index)

    def get_account_xpub(self, mixdepth: int) -> str:
        """
        Get the extended public key (xpub) for a mixdepth account.

        Derives the key at path m/84'/coin'/mixdepth' and returns its xpub.
        This xpub can be used in Bitcoin Core descriptors for efficient scanning.

        Args:
            mixdepth: The mixdepth (account) number (0-4)

        Returns:
            xpub/tpub string for the account
        """
        account_path = f"{self.root_path}/{mixdepth}'"
        account_key = self.master_key.derive(account_path)
        return account_key.get_xpub(self.network)

    def get_scan_descriptors(self, scan_range: int = DEFAULT_SCAN_RANGE) -> list[dict[str, Any]]:
        """
        Generate descriptors for efficient UTXO scanning with Bitcoin Core.

        Creates wpkh() descriptors with xpub and range for all mixdepths,
        both external (receive) and internal (change) addresses.

        Using descriptors with ranges is much more efficient than scanning
        individual addresses, as Bitcoin Core can scan the entire range in
        a single pass through the UTXO set.

        Args:
            scan_range: Maximum index to scan (default 1000, Bitcoin Core's default)

        Returns:
            List of descriptor dicts for use with scantxoutset:
            [{"desc": "wpkh(xpub.../0/*)", "range": [0, 999]}, ...]
        """
        descriptors = []

        for mixdepth in range(self.mixdepth_count):
            xpub = self.get_account_xpub(mixdepth)

            # External (receive) addresses: .../0/*
            descriptors.append({"desc": f"wpkh({xpub}/0/*)", "range": [0, scan_range - 1]})

            # Internal (change) addresses: .../1/*
            descriptors.append({"desc": f"wpkh({xpub}/1/*)", "range": [0, scan_range - 1]})

        logger.debug(
            f"Generated {len(descriptors)} descriptors for {self.mixdepth_count} mixdepths "
            f"with range [0, {scan_range - 1}]"
        )
        return descriptors

    def get_fidelity_bond_key(self, index: int, locktime: int) -> HDKey:
        """
        Get the HD key for a fidelity bond.

        Fidelity bond path: m/84'/coin'/0'/2/index
        The locktime is NOT in the derivation path, but stored separately.

        Args:
            index: Address index within the fidelity bond branch
            locktime: Unix timestamp for the timelock (stored in path notation as :locktime)

        Returns:
            HDKey for the fidelity bond
        """
        # Fidelity bonds always use mixdepth 0, branch 2
        path = f"{self.root_path}/0'/{FIDELITY_BOND_BRANCH}/{index}"
        return self.master_key.derive(path)

    def get_fidelity_bond_address(self, index: int, locktime: int) -> str:
        """
        Get a fidelity bond P2WSH address.

        Creates a timelocked script: <locktime> OP_CLTV OP_DROP <pubkey> OP_CHECKSIG
        wrapped in P2WSH.

        Args:
            index: Address index within the fidelity bond branch
            locktime: Unix timestamp for the timelock

        Returns:
            P2WSH address for the fidelity bond
        """
        key = self.get_fidelity_bond_key(index, locktime)
        pubkey_hex = key.get_public_key_bytes(compressed=True).hex()

        # Create the timelock script
        script = mk_freeze_script(pubkey_hex, locktime)

        # Convert to P2WSH address
        address = script_to_p2wsh_address(script, self.network)

        # Cache with special path notation including locktime
        # Path format: m/84'/coin'/0'/2/index:locktime
        self.address_cache[address] = (0, FIDELITY_BOND_BRANCH, index)
        # Also store the locktime in a separate cache for fidelity bonds
        if not hasattr(self, "fidelity_bond_locktime_cache"):
            self.fidelity_bond_locktime_cache: dict[str, int] = {}
        self.fidelity_bond_locktime_cache[address] = locktime

        logger.debug(f"Created fidelity bond address {address} with locktime {locktime}")
        return address

    def get_fidelity_bond_script(self, index: int, locktime: int) -> bytes:
        """
        Get the redeem script for a fidelity bond.

        Args:
            index: Address index within the fidelity bond branch
            locktime: Unix timestamp for the timelock

        Returns:
            Timelock redeem script bytes
        """
        key = self.get_fidelity_bond_key(index, locktime)
        pubkey_hex = key.get_public_key_bytes(compressed=True).hex()
        return mk_freeze_script(pubkey_hex, locktime)

    def get_locktime_for_address(self, address: str) -> int | None:
        """
        Get the locktime for a fidelity bond address.

        Args:
            address: The fidelity bond address

        Returns:
            Locktime as Unix timestamp, or None if not a fidelity bond address
        """
        if not hasattr(self, "fidelity_bond_locktime_cache"):
            return None
        return self.fidelity_bond_locktime_cache.get(address)

    def get_private_key(self, mixdepth: int, change: int, index: int) -> bytes:
        """Get private key for given path"""
        path = f"{self.root_path}/{mixdepth}'/{change}/{index}"
        key = self.master_key.derive(path)
        return key.get_private_key_bytes()

    def get_key_for_address(self, address: str) -> HDKey | None:
        """Get HD key for a known address"""
        if address not in self.address_cache:
            return None

        mixdepth, change, index = self.address_cache[address]
        path = f"{self.root_path}/{mixdepth}'/{change}/{index}"
        return self.master_key.derive(path)

    async def sync_mixdepth(self, mixdepth: int) -> list[UTXOInfo]:
        """
        Sync a mixdepth with the blockchain.
        Scans addresses up to gap limit.
        """
        utxos: list[UTXOInfo] = []

        for change in [0, 1]:
            consecutive_empty = 0
            index = 0

            while consecutive_empty < self.gap_limit:
                # Scan in batches of gap_limit size for performance
                batch_size = self.gap_limit
                addresses = []

                for i in range(batch_size):
                    address = self.get_address(mixdepth, change, index + i)
                    addresses.append(address)

                # Fetch UTXOs for the whole batch
                backend_utxos = await self.backend.get_utxos(addresses)

                # Group results by address
                utxos_by_address: dict[str, list] = {addr: [] for addr in addresses}
                for utxo in backend_utxos:
                    if utxo.address in utxos_by_address:
                        utxos_by_address[utxo.address].append(utxo)

                # Process batch results in order
                for i, address in enumerate(addresses):
                    addr_utxos = utxos_by_address[address]

                    if addr_utxos:
                        consecutive_empty = 0
                        for utxo in addr_utxos:
                            path = f"{self.root_path}/{mixdepth}'/{change}/{index + i}"
                            utxo_info = UTXOInfo(
                                txid=utxo.txid,
                                vout=utxo.vout,
                                value=utxo.value,
                                address=address,
                                confirmations=utxo.confirmations,
                                scriptpubkey=utxo.scriptpubkey,
                                path=path,
                                mixdepth=mixdepth,
                                height=utxo.height,
                            )
                            utxos.append(utxo_info)
                    else:
                        consecutive_empty += 1

                    if consecutive_empty >= self.gap_limit:
                        break

                index += batch_size

            logger.debug(
                f"Synced mixdepth {mixdepth} change {change}: "
                f"scanned ~{index} addresses, found "
                f"{len([u for u in utxos if u.path.split('/')[-2] == str(change)])} UTXOs"
            )

        self.utxo_cache[mixdepth] = utxos
        return utxos

    async def sync_fidelity_bonds(self, locktimes: list[int]) -> list[UTXOInfo]:
        """
        Sync fidelity bond UTXOs with specific locktimes.

        Fidelity bonds use mixdepth 0, branch 2, with path format:
        m/84'/coin'/0'/2/index:locktime

        Args:
            locktimes: List of Unix timestamps to scan for

        Returns:
            List of fidelity bond UTXOs found
        """
        utxos: list[UTXOInfo] = []

        if not locktimes:
            logger.debug("No locktimes provided for fidelity bond sync")
            return utxos

        for locktime in locktimes:
            consecutive_empty = 0
            index = 0

            while consecutive_empty < self.gap_limit:
                # Generate addresses for this locktime
                addresses = []
                for i in range(self.gap_limit):
                    address = self.get_fidelity_bond_address(index + i, locktime)
                    addresses.append(address)

                # Fetch UTXOs
                backend_utxos = await self.backend.get_utxos(addresses)

                # Group by address
                utxos_by_address: dict[str, list] = {addr: [] for addr in addresses}
                for utxo in backend_utxos:
                    if utxo.address in utxos_by_address:
                        utxos_by_address[utxo.address].append(utxo)

                # Process results
                for i, address in enumerate(addresses):
                    addr_utxos = utxos_by_address[address]

                    if addr_utxos:
                        consecutive_empty = 0
                        for utxo in addr_utxos:
                            # Path includes locktime notation
                            path = (
                                f"{self.root_path}/0'/{FIDELITY_BOND_BRANCH}/{index + i}:{locktime}"
                            )
                            utxo_info = UTXOInfo(
                                txid=utxo.txid,
                                vout=utxo.vout,
                                value=utxo.value,
                                address=address,
                                confirmations=utxo.confirmations,
                                scriptpubkey=utxo.scriptpubkey,
                                path=path,
                                mixdepth=0,  # Fidelity bonds always in mixdepth 0
                                height=utxo.height,
                                locktime=locktime,  # Store locktime for P2WSH signing
                            )
                            utxos.append(utxo_info)
                            logger.info(
                                f"Found fidelity bond UTXO: {utxo.txid}:{utxo.vout} "
                                f"value={utxo.value} locktime={locktime}"
                            )
                    else:
                        consecutive_empty += 1

                    if consecutive_empty >= self.gap_limit:
                        break

                index += self.gap_limit

        # Add fidelity bond UTXOs to mixdepth 0 cache
        if utxos:
            if 0 not in self.utxo_cache:
                self.utxo_cache[0] = []
            self.utxo_cache[0].extend(utxos)
            logger.info(f"Found {len(utxos)} fidelity bond UTXOs")

        return utxos

    async def sync_all(
        self, fidelity_bond_addresses: list[tuple[str, int, int]] | None = None
    ) -> dict[int, list[UTXOInfo]]:
        """
        Sync all mixdepths, optionally including fidelity bond addresses.

        Args:
            fidelity_bond_addresses: Optional list of (address, locktime, index) tuples
                                    for fidelity bonds to scan with wallet descriptors

        Returns:
            Dictionary mapping mixdepth to list of UTXOs
        """
        logger.info("Syncing all mixdepths...")

        # Try efficient descriptor-based sync if backend supports it
        if hasattr(self.backend, "scan_descriptors"):
            result = await self._sync_all_with_descriptors(fidelity_bond_addresses)
            if result is not None:
                return result
            # Fall back to address-by-address sync on failure
            logger.warning("Descriptor scan failed, falling back to address scan")

        # Legacy address-by-address scanning
        result = {}
        for mixdepth in range(self.mixdepth_count):
            utxos = await self.sync_mixdepth(mixdepth)
            result[mixdepth] = utxos
        logger.info(f"Sync complete: {sum(len(u) for u in result.values())} total UTXOs")
        return result

    async def _sync_all_with_descriptors(
        self, fidelity_bond_addresses: list[tuple[str, int, int]] | None = None
    ) -> dict[int, list[UTXOInfo]] | None:
        """
        Sync all mixdepths using efficient descriptor scanning.

        This scans the entire wallet in a single UTXO set pass using xpub descriptors,
        which is much faster than scanning addresses individually (especially on mainnet
        where a full UTXO set scan takes ~90 seconds).

        Args:
            fidelity_bond_addresses: Optional list of (address, locktime, index) tuples to scan
                                    in the same pass as wallet descriptors

        Returns:
            Dictionary mapping mixdepth to list of UTXOInfo, or None on failure
        """
        # Generate descriptors for all mixdepths and build a lookup table
        scan_range = max(DEFAULT_SCAN_RANGE, self.gap_limit * 10)
        descriptors: list[str | dict[str, Any]] = []
        # Map descriptor string (without checksum) -> (mixdepth, change)
        desc_to_path: dict[str, tuple[int, int]] = {}
        # Map fidelity bond address -> (locktime, index)
        bond_address_to_info: dict[str, tuple[int, int]] = {}

        for mixdepth in range(self.mixdepth_count):
            xpub = self.get_account_xpub(mixdepth)

            # External (receive) addresses: .../0/*
            desc_ext = f"wpkh({xpub}/0/*)"
            descriptors.append({"desc": desc_ext, "range": [0, scan_range - 1]})
            desc_to_path[desc_ext] = (mixdepth, 0)

            # Internal (change) addresses: .../1/*
            desc_int = f"wpkh({xpub}/1/*)"
            descriptors.append({"desc": desc_int, "range": [0, scan_range - 1]})
            desc_to_path[desc_int] = (mixdepth, 1)

        # Add fidelity bond addresses to the scan
        if fidelity_bond_addresses:
            logger.info(
                f"Including {len(fidelity_bond_addresses)} fidelity bond address(es) in scan"
            )
            # Initialize locktime cache if needed
            if not hasattr(self, "fidelity_bond_locktime_cache"):
                self.fidelity_bond_locktime_cache = {}

            for address, locktime, index in fidelity_bond_addresses:
                descriptors.append(f"addr({address})")
                bond_address_to_info[address] = (locktime, index)
                # Cache the address with the correct index from registry
                self.address_cache[address] = (0, FIDELITY_BOND_BRANCH, index)
                self.fidelity_bond_locktime_cache[address] = locktime

        # Get current block height for confirmation calculation
        try:
            tip_height = await self.backend.get_block_height()
        except Exception as e:
            logger.error(f"Failed to get block height for descriptor scan: {e}")
            return None

        # Perform the scan
        scan_result = await self.backend.scan_descriptors(descriptors)
        if not scan_result or not scan_result.get("success", False):
            return None

        # Parse results and organize by mixdepth
        result: dict[int, list[UTXOInfo]] = {md: [] for md in range(self.mixdepth_count)}
        fidelity_bond_utxos: list[UTXOInfo] = []

        for utxo_data in scan_result.get("unspents", []):
            desc = utxo_data.get("desc", "")

            # Check if this is a fidelity bond address result
            # Fidelity bond descriptors are returned as: addr(bc1q...)#checksum
            if "#" in desc:
                desc_base = desc.split("#")[0]
            else:
                desc_base = desc

            if desc_base.startswith("addr(") and desc_base.endswith(")"):
                bond_address = desc_base[5:-1]
                if bond_address in bond_address_to_info:
                    # This is a fidelity bond UTXO
                    locktime, index = bond_address_to_info[bond_address]
                    confirmations = 0
                    utxo_height = utxo_data.get("height", 0)
                    if utxo_height > 0:
                        confirmations = tip_height - utxo_height + 1

                    # Path format for fidelity bonds: m/84'/0'/0'/2/index:locktime
                    path = f"{self.root_path}/0'/{FIDELITY_BOND_BRANCH}/{index}:{locktime}"

                    utxo_info = UTXOInfo(
                        txid=utxo_data["txid"],
                        vout=utxo_data["vout"],
                        value=btc_to_sats(utxo_data["amount"]),
                        address=bond_address,
                        confirmations=confirmations,
                        scriptpubkey=utxo_data.get("scriptPubKey", ""),
                        path=path,
                        mixdepth=0,  # Fidelity bonds in mixdepth 0
                        height=utxo_height if utxo_height > 0 else None,
                        locktime=locktime,
                    )
                    fidelity_bond_utxos.append(utxo_info)
                    logger.info(
                        f"Found fidelity bond UTXO: {utxo_info.txid}:{utxo_info.vout} "
                        f"value={utxo_info.value} locktime={locktime} index={index}"
                    )
                    continue

            # Parse the descriptor to extract change and index for regular wallet UTXOs
            # Descriptor format from Bitcoin Core when using xpub:
            # wpkh([fingerprint/change/index]pubkey)#checksum
            # The fingerprint is the parent xpub's fingerprint
            path_info = self._parse_descriptor_path(desc, desc_to_path)

            if path_info is None:
                logger.warning(f"Could not parse path from descriptor: {desc}")
                continue

            mixdepth, change, index = path_info

            # Calculate confirmations
            confirmations = 0
            utxo_height = utxo_data.get("height", 0)
            if utxo_height > 0:
                confirmations = tip_height - utxo_height + 1

            # Generate the address and cache it
            address = self.get_address(mixdepth, change, index)

            # Build path string
            path = f"{self.root_path}/{mixdepth}'/{change}/{index}"

            utxo_info = UTXOInfo(
                txid=utxo_data["txid"],
                vout=utxo_data["vout"],
                value=btc_to_sats(utxo_data["amount"]),
                address=address,
                confirmations=confirmations,
                scriptpubkey=utxo_data.get("scriptPubKey", ""),
                path=path,
                mixdepth=mixdepth,
                height=utxo_height if utxo_height > 0 else None,
            )
            result[mixdepth].append(utxo_info)

        # Add fidelity bond UTXOs to mixdepth 0
        if fidelity_bond_utxos:
            result[0].extend(fidelity_bond_utxos)

        # Update cache
        self.utxo_cache = result

        total_utxos = sum(len(u) for u in result.values())
        total_value = sum(sum(u.value for u in utxos) for utxos in result.values())
        bond_count = len(fidelity_bond_utxos)
        if bond_count > 0:
            logger.info(
                f"Descriptor sync complete: {total_utxos} UTXOs "
                f"({bond_count} fidelity bond(s)), {format_amount(total_value)} total"
            )
        else:
            logger.info(
                f"Descriptor sync complete: {total_utxos} UTXOs, {format_amount(total_value)} total"
            )

        return result

    def _parse_descriptor_path(
        self, desc: str, desc_to_path: dict[str, tuple[int, int]]
    ) -> tuple[int, int, int] | None:
        """
        Parse a descriptor to extract mixdepth, change, and index.

        When using xpub descriptors, Bitcoin Core returns a descriptor showing
        the path RELATIVE to the xpub we provided:
        wpkh([fingerprint/change/index]pubkey)#checksum

        We need to match this back to the original descriptor to determine mixdepth.

        Args:
            desc: Descriptor string from scantxoutset result
            desc_to_path: Mapping of descriptor (without checksum) to (mixdepth, change)

        Returns:
            Tuple of (mixdepth, change, index) or None if parsing fails
        """
        import re

        # Remove checksum
        if "#" in desc:
            desc_base = desc.split("#")[0]
        else:
            desc_base = desc

        # Extract the relative path [fingerprint/change/index] and pubkey
        # Pattern: wpkh([fingerprint/change/index]pubkey)
        match = re.search(r"wpkh\(\[[\da-f]+/(\d+)/(\d+)\]([\da-f]+)\)", desc_base, re.I)
        if not match:
            return None

        change_from_desc = int(match.group(1))
        index = int(match.group(2))
        pubkey = match.group(3)

        # Find which descriptor this matches by checking all our descriptors
        # We need to derive the key and check if it matches the pubkey
        for base_desc, (mixdepth, change) in desc_to_path.items():
            if change == change_from_desc:
                # Verify by deriving the key and comparing pubkeys
                try:
                    derived_key = self.master_key.derive(
                        f"{self.root_path}/{mixdepth}'/{change}/{index}"
                    )
                    derived_pubkey = derived_key.get_public_key_bytes(compressed=True).hex()
                    if derived_pubkey == pubkey:
                        return (mixdepth, change, index)
                except Exception:
                    continue

        return None

    async def get_balance(self, mixdepth: int) -> int:
        """Get balance for a mixdepth"""
        if mixdepth not in self.utxo_cache:
            await self.sync_mixdepth(mixdepth)

        utxos = self.utxo_cache.get(mixdepth, [])
        return sum(utxo.value for utxo in utxos)

    async def get_utxos(self, mixdepth: int) -> list[UTXOInfo]:
        """Get UTXOs for a mixdepth, syncing if not cached."""
        if mixdepth not in self.utxo_cache:
            await self.sync_mixdepth(mixdepth)
        return self.utxo_cache.get(mixdepth, [])

    def find_utxo_by_address(self, address: str) -> UTXOInfo | None:
        """
        Find a UTXO by its address across all mixdepths.

        This is useful for matching CoinJoin outputs to history entries.
        Returns the first matching UTXO found, or None if address not found.

        Args:
            address: Bitcoin address to search for

        Returns:
            UTXOInfo if found, None otherwise
        """
        for mixdepth in range(self.mixdepth_count):
            utxos = self.utxo_cache.get(mixdepth, [])
            for utxo in utxos:
                if utxo.address == address:
                    return utxo
        return None

    async def get_total_balance(self) -> int:
        """Get total balance across all mixdepths"""
        total = 0
        for mixdepth in range(self.mixdepth_count):
            balance = await self.get_balance(mixdepth)
            total += balance
        return total

    def select_utxos(
        self,
        mixdepth: int,
        target_amount: int,
        min_confirmations: int = 1,
        include_utxos: list[UTXOInfo] | None = None,
    ) -> list[UTXOInfo]:
        """
        Select UTXOs for spending from a mixdepth.
        Uses simple greedy selection strategy.

        Args:
            mixdepth: Mixdepth to select from
            target_amount: Target amount in satoshis
            min_confirmations: Minimum confirmations required
            include_utxos: List of UTXOs that MUST be included in selection
        """
        utxos = self.utxo_cache.get(mixdepth, [])

        eligible = [utxo for utxo in utxos if utxo.confirmations >= min_confirmations]

        # Filter out included UTXOs from eligible pool to avoid duplicates
        included_txid_vout = set()
        if include_utxos:
            included_txid_vout = {(u.txid, u.vout) for u in include_utxos}
            eligible = [u for u in eligible if (u.txid, u.vout) not in included_txid_vout]

        eligible.sort(key=lambda u: u.value, reverse=True)

        selected = []
        total = 0

        # Add mandatory UTXOs first
        if include_utxos:
            for utxo in include_utxos:
                selected.append(utxo)
                total += utxo.value

        if total >= target_amount:
            # Already enough with mandatory UTXOs
            return selected

        for utxo in eligible:
            selected.append(utxo)
            total += utxo.value
            if total >= target_amount:
                break

        if total < target_amount:
            raise ValueError(f"Insufficient funds: need {target_amount}, have {total}")

        return selected

    def get_all_utxos(
        self,
        mixdepth: int,
        min_confirmations: int = 1,
    ) -> list[UTXOInfo]:
        """
        Get all UTXOs from a mixdepth for sweep operations.

        Unlike select_utxos(), this returns ALL eligible UTXOs regardless of
        target amount. Used for sweep mode to ensure no change output.

        Args:
            mixdepth: Mixdepth to get UTXOs from
            min_confirmations: Minimum confirmations required

        Returns:
            List of all eligible UTXOs in the mixdepth
        """
        utxos = self.utxo_cache.get(mixdepth, [])
        eligible = [utxo for utxo in utxos if utxo.confirmations >= min_confirmations]
        return eligible

    def select_utxos_with_merge(
        self,
        mixdepth: int,
        target_amount: int,
        min_confirmations: int = 1,
        merge_algorithm: str = "default",
    ) -> list[UTXOInfo]:
        """
        Select UTXOs with merge algorithm for maker UTXO consolidation.

        Unlike regular select_utxos(), this method can select MORE UTXOs than
        strictly necessary based on the merge algorithm. Since takers pay tx fees,
        makers can add extra inputs "for free" to consolidate their UTXOs.

        Args:
            mixdepth: Mixdepth to select from
            target_amount: Minimum target amount in satoshis
            min_confirmations: Minimum confirmations required
            merge_algorithm: Selection strategy:
                - "default": Minimum UTXOs needed (same as select_utxos)
                - "gradual": +1 additional UTXO beyond minimum
                - "greedy": ALL eligible UTXOs from the mixdepth
                - "random": +0 to +2 additional UTXOs randomly

        Returns:
            List of selected UTXOs

        Raises:
            ValueError: If insufficient funds
        """
        import random as rand_module

        utxos = self.utxo_cache.get(mixdepth, [])
        eligible = [utxo for utxo in utxos if utxo.confirmations >= min_confirmations]

        # Sort by value descending for efficient selection
        eligible.sort(key=lambda u: u.value, reverse=True)

        # First, select minimum needed (greedy by value)
        selected = []
        total = 0

        for utxo in eligible:
            selected.append(utxo)
            total += utxo.value
            if total >= target_amount:
                break

        if total < target_amount:
            raise ValueError(f"Insufficient funds: need {target_amount}, have {total}")

        # Record where minimum selection ends
        min_count = len(selected)

        # Get remaining eligible UTXOs not yet selected
        remaining = eligible[min_count:]

        # Apply merge algorithm to add additional UTXOs
        if merge_algorithm == "greedy":
            # Add ALL remaining UTXOs
            selected.extend(remaining)
        elif merge_algorithm == "gradual" and remaining:
            # Add exactly 1 more UTXO (smallest to preserve larger ones)
            remaining_sorted = sorted(remaining, key=lambda u: u.value)
            selected.append(remaining_sorted[0])
        elif merge_algorithm == "random" and remaining:
            # Add 0-2 additional UTXOs randomly
            extra_count = rand_module.randint(0, min(2, len(remaining)))
            if extra_count > 0:
                # Prefer smaller UTXOs for consolidation
                remaining_sorted = sorted(remaining, key=lambda u: u.value)
                selected.extend(remaining_sorted[:extra_count])
        # "default" - no additional UTXOs

        return selected

    def get_next_address_index(self, mixdepth: int, change: int) -> int:
        """
        Get next unused address index for mixdepth/change.

        Checks both the address/UTXO cache and the CoinJoin history to ensure
        we never reuse addresses that were shared in previous CoinJoins, even
        if those transactions weren't confirmed or we don't know their txid.
        """
        max_index = -1

        for address, (md, ch, idx) in self.address_cache.items():
            if md == mixdepth and ch == change:
                if idx > max_index:
                    max_index = idx

        utxos = self.utxo_cache.get(mixdepth, [])
        for utxo in utxos:
            if utxo.address in self.address_cache:
                md, ch, idx = self.address_cache[utxo.address]
                if md == mixdepth and ch == change and idx > max_index:
                    max_index = idx

        # Check history for used addresses to prevent reuse
        used_addresses: set[str] = set()
        if self.data_dir:
            from jmwallet.history import get_used_addresses

            used_addresses = get_used_addresses(self.data_dir)

        # Find the first index that generates an unused address
        candidate_index = max_index + 1
        max_attempts = 100  # Safety limit to prevent infinite loop

        for attempt in range(max_attempts):
            test_address = self.get_address(mixdepth, change, candidate_index)
            if test_address not in used_addresses:
                return candidate_index
            # This address was used in history, try next
            logger.warning(
                f"Skipping index {candidate_index} for mixdepth {mixdepth}, "
                f"change {change} - address was used in previous CoinJoin"
            )
            candidate_index += 1

        # Shouldn't happen unless we have 100 consecutive used addresses
        raise RuntimeError(
            f"Could not find unused address after {max_attempts} attempts. "
            f"This likely indicates a bug in address history tracking."
        )

    async def sync(self) -> dict[int, list[UTXOInfo]]:
        """Sync wallet (alias for sync_all for backward compatibility)."""
        return await self.sync_all()

    def get_new_address(self, mixdepth: int) -> str:
        """Get next unused receive address for a mixdepth."""
        next_index = self.get_next_address_index(mixdepth, 0)
        return self.get_receive_address(mixdepth, next_index)

    async def close(self) -> None:
        """Close backend connection"""
        await self.backend.close()

JoinMarket wallet service. Manages BIP84 hierarchical deterministic wallet with mixdepths.

Derivation path: m/84'/0'/{mixdepth}'/{change}/{index} - mixdepth: 0-4 (JoinMarket isolation levels) - change: 0 (external/receive), 1 (internal/change) - index: address index

Methods

async def close(self) ‑> None
Expand source code
async def close(self) -> None:
    """Close backend connection"""
    await self.backend.close()

Close backend connection

def find_utxo_by_address(self, address: str) ‑> UTXOInfo | None
Expand source code
def find_utxo_by_address(self, address: str) -> UTXOInfo | None:
    """
    Find a UTXO by its address across all mixdepths.

    This is useful for matching CoinJoin outputs to history entries.
    Returns the first matching UTXO found, or None if address not found.

    Args:
        address: Bitcoin address to search for

    Returns:
        UTXOInfo if found, None otherwise
    """
    for mixdepth in range(self.mixdepth_count):
        utxos = self.utxo_cache.get(mixdepth, [])
        for utxo in utxos:
            if utxo.address == address:
                return utxo
    return None

Find a UTXO by its address across all mixdepths.

This is useful for matching CoinJoin outputs to history entries. Returns the first matching UTXO found, or None if address not found.

Args

address
Bitcoin address to search for

Returns

UTXOInfo if found, None otherwise

def get_account_xpub(self, mixdepth: int) ‑> str
Expand source code
def get_account_xpub(self, mixdepth: int) -> str:
    """
    Get the extended public key (xpub) for a mixdepth account.

    Derives the key at path m/84'/coin'/mixdepth' and returns its xpub.
    This xpub can be used in Bitcoin Core descriptors for efficient scanning.

    Args:
        mixdepth: The mixdepth (account) number (0-4)

    Returns:
        xpub/tpub string for the account
    """
    account_path = f"{self.root_path}/{mixdepth}'"
    account_key = self.master_key.derive(account_path)
    return account_key.get_xpub(self.network)

Get the extended public key (xpub) for a mixdepth account.

Derives the key at path m/84'/coin'/mixdepth' and returns its xpub. This xpub can be used in Bitcoin Core descriptors for efficient scanning.

Args

mixdepth
The mixdepth (account) number (0-4)

Returns

xpub/tpub string for the account

def get_address(self, mixdepth: int, change: int, index: int) ‑> str
Expand source code
def get_address(self, mixdepth: int, change: int, index: int) -> str:
    """Get address for given path"""
    if mixdepth >= self.mixdepth_count:
        raise ValueError(f"Mixdepth {mixdepth} exceeds maximum {self.mixdepth_count}")

    path = f"{self.root_path}/{mixdepth}'/{change}/{index}"
    key = self.master_key.derive(path)
    address = key.get_address(self.network)

    self.address_cache[address] = (mixdepth, change, index)

    return address

Get address for given path

def get_all_utxos(self, mixdepth: int, min_confirmations: int = 1) ‑> list[UTXOInfo]
Expand source code
def get_all_utxos(
    self,
    mixdepth: int,
    min_confirmations: int = 1,
) -> list[UTXOInfo]:
    """
    Get all UTXOs from a mixdepth for sweep operations.

    Unlike select_utxos(), this returns ALL eligible UTXOs regardless of
    target amount. Used for sweep mode to ensure no change output.

    Args:
        mixdepth: Mixdepth to get UTXOs from
        min_confirmations: Minimum confirmations required

    Returns:
        List of all eligible UTXOs in the mixdepth
    """
    utxos = self.utxo_cache.get(mixdepth, [])
    eligible = [utxo for utxo in utxos if utxo.confirmations >= min_confirmations]
    return eligible

Get all UTXOs from a mixdepth for sweep operations.

Unlike select_utxos(), this returns ALL eligible UTXOs regardless of target amount. Used for sweep mode to ensure no change output.

Args

mixdepth
Mixdepth to get UTXOs from
min_confirmations
Minimum confirmations required

Returns

List of all eligible UTXOs in the mixdepth

async def get_balance(self, mixdepth: int) ‑> int
Expand source code
async def get_balance(self, mixdepth: int) -> int:
    """Get balance for a mixdepth"""
    if mixdepth not in self.utxo_cache:
        await self.sync_mixdepth(mixdepth)

    utxos = self.utxo_cache.get(mixdepth, [])
    return sum(utxo.value for utxo in utxos)

Get balance for a mixdepth

def get_change_address(self, mixdepth: int, index: int) ‑> str
Expand source code
def get_change_address(self, mixdepth: int, index: int) -> str:
    """Get internal (change) address"""
    return self.get_address(mixdepth, 1, index)

Get internal (change) address

def get_fidelity_bond_address(self, index: int, locktime: int) ‑> str
Expand source code
def get_fidelity_bond_address(self, index: int, locktime: int) -> str:
    """
    Get a fidelity bond P2WSH address.

    Creates a timelocked script: <locktime> OP_CLTV OP_DROP <pubkey> OP_CHECKSIG
    wrapped in P2WSH.

    Args:
        index: Address index within the fidelity bond branch
        locktime: Unix timestamp for the timelock

    Returns:
        P2WSH address for the fidelity bond
    """
    key = self.get_fidelity_bond_key(index, locktime)
    pubkey_hex = key.get_public_key_bytes(compressed=True).hex()

    # Create the timelock script
    script = mk_freeze_script(pubkey_hex, locktime)

    # Convert to P2WSH address
    address = script_to_p2wsh_address(script, self.network)

    # Cache with special path notation including locktime
    # Path format: m/84'/coin'/0'/2/index:locktime
    self.address_cache[address] = (0, FIDELITY_BOND_BRANCH, index)
    # Also store the locktime in a separate cache for fidelity bonds
    if not hasattr(self, "fidelity_bond_locktime_cache"):
        self.fidelity_bond_locktime_cache: dict[str, int] = {}
    self.fidelity_bond_locktime_cache[address] = locktime

    logger.debug(f"Created fidelity bond address {address} with locktime {locktime}")
    return address

Get a fidelity bond P2WSH address.

Creates a timelocked script: OP_CLTV OP_DROP OP_CHECKSIG wrapped in P2WSH.

Args

index
Address index within the fidelity bond branch
locktime
Unix timestamp for the timelock

Returns

P2WSH address for the fidelity bond

def get_fidelity_bond_key(self, index: int, locktime: int) ‑> HDKey
Expand source code
def get_fidelity_bond_key(self, index: int, locktime: int) -> HDKey:
    """
    Get the HD key for a fidelity bond.

    Fidelity bond path: m/84'/coin'/0'/2/index
    The locktime is NOT in the derivation path, but stored separately.

    Args:
        index: Address index within the fidelity bond branch
        locktime: Unix timestamp for the timelock (stored in path notation as :locktime)

    Returns:
        HDKey for the fidelity bond
    """
    # Fidelity bonds always use mixdepth 0, branch 2
    path = f"{self.root_path}/0'/{FIDELITY_BOND_BRANCH}/{index}"
    return self.master_key.derive(path)

Get the HD key for a fidelity bond.

Fidelity bond path: m/84'/coin'/0'/2/index The locktime is NOT in the derivation path, but stored separately.

Args

index
Address index within the fidelity bond branch
locktime
Unix timestamp for the timelock (stored in path notation as :locktime)

Returns

HDKey for the fidelity bond

def get_fidelity_bond_script(self, index: int, locktime: int) ‑> bytes
Expand source code
def get_fidelity_bond_script(self, index: int, locktime: int) -> bytes:
    """
    Get the redeem script for a fidelity bond.

    Args:
        index: Address index within the fidelity bond branch
        locktime: Unix timestamp for the timelock

    Returns:
        Timelock redeem script bytes
    """
    key = self.get_fidelity_bond_key(index, locktime)
    pubkey_hex = key.get_public_key_bytes(compressed=True).hex()
    return mk_freeze_script(pubkey_hex, locktime)

Get the redeem script for a fidelity bond.

Args

index
Address index within the fidelity bond branch
locktime
Unix timestamp for the timelock

Returns

Timelock redeem script bytes

def get_key_for_address(self, address: str) ‑> HDKey | None
Expand source code
def get_key_for_address(self, address: str) -> HDKey | None:
    """Get HD key for a known address"""
    if address not in self.address_cache:
        return None

    mixdepth, change, index = self.address_cache[address]
    path = f"{self.root_path}/{mixdepth}'/{change}/{index}"
    return self.master_key.derive(path)

Get HD key for a known address

def get_locktime_for_address(self, address: str) ‑> int | None
Expand source code
def get_locktime_for_address(self, address: str) -> int | None:
    """
    Get the locktime for a fidelity bond address.

    Args:
        address: The fidelity bond address

    Returns:
        Locktime as Unix timestamp, or None if not a fidelity bond address
    """
    if not hasattr(self, "fidelity_bond_locktime_cache"):
        return None
    return self.fidelity_bond_locktime_cache.get(address)

Get the locktime for a fidelity bond address.

Args

address
The fidelity bond address

Returns

Locktime as Unix timestamp, or None if not a fidelity bond address

def get_new_address(self, mixdepth: int) ‑> str
Expand source code
def get_new_address(self, mixdepth: int) -> str:
    """Get next unused receive address for a mixdepth."""
    next_index = self.get_next_address_index(mixdepth, 0)
    return self.get_receive_address(mixdepth, next_index)

Get next unused receive address for a mixdepth.

def get_next_address_index(self, mixdepth: int, change: int) ‑> int
Expand source code
def get_next_address_index(self, mixdepth: int, change: int) -> int:
    """
    Get next unused address index for mixdepth/change.

    Checks both the address/UTXO cache and the CoinJoin history to ensure
    we never reuse addresses that were shared in previous CoinJoins, even
    if those transactions weren't confirmed or we don't know their txid.
    """
    max_index = -1

    for address, (md, ch, idx) in self.address_cache.items():
        if md == mixdepth and ch == change:
            if idx > max_index:
                max_index = idx

    utxos = self.utxo_cache.get(mixdepth, [])
    for utxo in utxos:
        if utxo.address in self.address_cache:
            md, ch, idx = self.address_cache[utxo.address]
            if md == mixdepth and ch == change and idx > max_index:
                max_index = idx

    # Check history for used addresses to prevent reuse
    used_addresses: set[str] = set()
    if self.data_dir:
        from jmwallet.history import get_used_addresses

        used_addresses = get_used_addresses(self.data_dir)

    # Find the first index that generates an unused address
    candidate_index = max_index + 1
    max_attempts = 100  # Safety limit to prevent infinite loop

    for attempt in range(max_attempts):
        test_address = self.get_address(mixdepth, change, candidate_index)
        if test_address not in used_addresses:
            return candidate_index
        # This address was used in history, try next
        logger.warning(
            f"Skipping index {candidate_index} for mixdepth {mixdepth}, "
            f"change {change} - address was used in previous CoinJoin"
        )
        candidate_index += 1

    # Shouldn't happen unless we have 100 consecutive used addresses
    raise RuntimeError(
        f"Could not find unused address after {max_attempts} attempts. "
        f"This likely indicates a bug in address history tracking."
    )

Get next unused address index for mixdepth/change.

Checks both the address/UTXO cache and the CoinJoin history to ensure we never reuse addresses that were shared in previous CoinJoins, even if those transactions weren't confirmed or we don't know their txid.

def get_private_key(self, mixdepth: int, change: int, index: int) ‑> bytes
Expand source code
def get_private_key(self, mixdepth: int, change: int, index: int) -> bytes:
    """Get private key for given path"""
    path = f"{self.root_path}/{mixdepth}'/{change}/{index}"
    key = self.master_key.derive(path)
    return key.get_private_key_bytes()

Get private key for given path

def get_receive_address(self, mixdepth: int, index: int) ‑> str
Expand source code
def get_receive_address(self, mixdepth: int, index: int) -> str:
    """Get external (receive) address"""
    return self.get_address(mixdepth, 0, index)

Get external (receive) address

def get_scan_descriptors(self, scan_range: int = 1000) ‑> list[dict[str, typing.Any]]
Expand source code
def get_scan_descriptors(self, scan_range: int = DEFAULT_SCAN_RANGE) -> list[dict[str, Any]]:
    """
    Generate descriptors for efficient UTXO scanning with Bitcoin Core.

    Creates wpkh() descriptors with xpub and range for all mixdepths,
    both external (receive) and internal (change) addresses.

    Using descriptors with ranges is much more efficient than scanning
    individual addresses, as Bitcoin Core can scan the entire range in
    a single pass through the UTXO set.

    Args:
        scan_range: Maximum index to scan (default 1000, Bitcoin Core's default)

    Returns:
        List of descriptor dicts for use with scantxoutset:
        [{"desc": "wpkh(xpub.../0/*)", "range": [0, 999]}, ...]
    """
    descriptors = []

    for mixdepth in range(self.mixdepth_count):
        xpub = self.get_account_xpub(mixdepth)

        # External (receive) addresses: .../0/*
        descriptors.append({"desc": f"wpkh({xpub}/0/*)", "range": [0, scan_range - 1]})

        # Internal (change) addresses: .../1/*
        descriptors.append({"desc": f"wpkh({xpub}/1/*)", "range": [0, scan_range - 1]})

    logger.debug(
        f"Generated {len(descriptors)} descriptors for {self.mixdepth_count} mixdepths "
        f"with range [0, {scan_range - 1}]"
    )
    return descriptors

Generate descriptors for efficient UTXO scanning with Bitcoin Core.

Creates wpkh() descriptors with xpub and range for all mixdepths, both external (receive) and internal (change) addresses.

Using descriptors with ranges is much more efficient than scanning individual addresses, as Bitcoin Core can scan the entire range in a single pass through the UTXO set.

Args

scan_range
Maximum index to scan (default 1000, Bitcoin Core's default)

Returns

List of descriptor dicts for use with scantxoutset: [{"desc": "wpkh(xpub…/0/*)", "range": [0, 999]}, …]

async def get_total_balance(self) ‑> int
Expand source code
async def get_total_balance(self) -> int:
    """Get total balance across all mixdepths"""
    total = 0
    for mixdepth in range(self.mixdepth_count):
        balance = await self.get_balance(mixdepth)
        total += balance
    return total

Get total balance across all mixdepths

async def get_utxos(self, mixdepth: int) ‑> list[UTXOInfo]
Expand source code
async def get_utxos(self, mixdepth: int) -> list[UTXOInfo]:
    """Get UTXOs for a mixdepth, syncing if not cached."""
    if mixdepth not in self.utxo_cache:
        await self.sync_mixdepth(mixdepth)
    return self.utxo_cache.get(mixdepth, [])

Get UTXOs for a mixdepth, syncing if not cached.

def select_utxos(self,
mixdepth: int,
target_amount: int,
min_confirmations: int = 1,
include_utxos: list[UTXOInfo] | None = None) ‑> list[UTXOInfo]
Expand source code
def select_utxos(
    self,
    mixdepth: int,
    target_amount: int,
    min_confirmations: int = 1,
    include_utxos: list[UTXOInfo] | None = None,
) -> list[UTXOInfo]:
    """
    Select UTXOs for spending from a mixdepth.
    Uses simple greedy selection strategy.

    Args:
        mixdepth: Mixdepth to select from
        target_amount: Target amount in satoshis
        min_confirmations: Minimum confirmations required
        include_utxos: List of UTXOs that MUST be included in selection
    """
    utxos = self.utxo_cache.get(mixdepth, [])

    eligible = [utxo for utxo in utxos if utxo.confirmations >= min_confirmations]

    # Filter out included UTXOs from eligible pool to avoid duplicates
    included_txid_vout = set()
    if include_utxos:
        included_txid_vout = {(u.txid, u.vout) for u in include_utxos}
        eligible = [u for u in eligible if (u.txid, u.vout) not in included_txid_vout]

    eligible.sort(key=lambda u: u.value, reverse=True)

    selected = []
    total = 0

    # Add mandatory UTXOs first
    if include_utxos:
        for utxo in include_utxos:
            selected.append(utxo)
            total += utxo.value

    if total >= target_amount:
        # Already enough with mandatory UTXOs
        return selected

    for utxo in eligible:
        selected.append(utxo)
        total += utxo.value
        if total >= target_amount:
            break

    if total < target_amount:
        raise ValueError(f"Insufficient funds: need {target_amount}, have {total}")

    return selected

Select UTXOs for spending from a mixdepth. Uses simple greedy selection strategy.

Args

mixdepth
Mixdepth to select from
target_amount
Target amount in satoshis
min_confirmations
Minimum confirmations required
include_utxos
List of UTXOs that MUST be included in selection
def select_utxos_with_merge(self,
mixdepth: int,
target_amount: int,
min_confirmations: int = 1,
merge_algorithm: str = 'default') ‑> list[UTXOInfo]
Expand source code
def select_utxos_with_merge(
    self,
    mixdepth: int,
    target_amount: int,
    min_confirmations: int = 1,
    merge_algorithm: str = "default",
) -> list[UTXOInfo]:
    """
    Select UTXOs with merge algorithm for maker UTXO consolidation.

    Unlike regular select_utxos(), this method can select MORE UTXOs than
    strictly necessary based on the merge algorithm. Since takers pay tx fees,
    makers can add extra inputs "for free" to consolidate their UTXOs.

    Args:
        mixdepth: Mixdepth to select from
        target_amount: Minimum target amount in satoshis
        min_confirmations: Minimum confirmations required
        merge_algorithm: Selection strategy:
            - "default": Minimum UTXOs needed (same as select_utxos)
            - "gradual": +1 additional UTXO beyond minimum
            - "greedy": ALL eligible UTXOs from the mixdepth
            - "random": +0 to +2 additional UTXOs randomly

    Returns:
        List of selected UTXOs

    Raises:
        ValueError: If insufficient funds
    """
    import random as rand_module

    utxos = self.utxo_cache.get(mixdepth, [])
    eligible = [utxo for utxo in utxos if utxo.confirmations >= min_confirmations]

    # Sort by value descending for efficient selection
    eligible.sort(key=lambda u: u.value, reverse=True)

    # First, select minimum needed (greedy by value)
    selected = []
    total = 0

    for utxo in eligible:
        selected.append(utxo)
        total += utxo.value
        if total >= target_amount:
            break

    if total < target_amount:
        raise ValueError(f"Insufficient funds: need {target_amount}, have {total}")

    # Record where minimum selection ends
    min_count = len(selected)

    # Get remaining eligible UTXOs not yet selected
    remaining = eligible[min_count:]

    # Apply merge algorithm to add additional UTXOs
    if merge_algorithm == "greedy":
        # Add ALL remaining UTXOs
        selected.extend(remaining)
    elif merge_algorithm == "gradual" and remaining:
        # Add exactly 1 more UTXO (smallest to preserve larger ones)
        remaining_sorted = sorted(remaining, key=lambda u: u.value)
        selected.append(remaining_sorted[0])
    elif merge_algorithm == "random" and remaining:
        # Add 0-2 additional UTXOs randomly
        extra_count = rand_module.randint(0, min(2, len(remaining)))
        if extra_count > 0:
            # Prefer smaller UTXOs for consolidation
            remaining_sorted = sorted(remaining, key=lambda u: u.value)
            selected.extend(remaining_sorted[:extra_count])
    # "default" - no additional UTXOs

    return selected

Select UTXOs with merge algorithm for maker UTXO consolidation.

Unlike regular select_utxos(), this method can select MORE UTXOs than strictly necessary based on the merge algorithm. Since takers pay tx fees, makers can add extra inputs "for free" to consolidate their UTXOs.

Args

mixdepth
Mixdepth to select from
target_amount
Minimum target amount in satoshis
min_confirmations
Minimum confirmations required
merge_algorithm
Selection strategy: - "default": Minimum UTXOs needed (same as select_utxos) - "gradual": +1 additional UTXO beyond minimum - "greedy": ALL eligible UTXOs from the mixdepth - "random": +0 to +2 additional UTXOs randomly

Returns

List of selected UTXOs

Raises

ValueError
If insufficient funds
async def sync(self) ‑> dict[int, list[UTXOInfo]]
Expand source code
async def sync(self) -> dict[int, list[UTXOInfo]]:
    """Sync wallet (alias for sync_all for backward compatibility)."""
    return await self.sync_all()

Sync wallet (alias for sync_all for backward compatibility).

async def sync_all(self, fidelity_bond_addresses: list[tuple[str, int, int]] | None = None) ‑> dict[int, list[UTXOInfo]]
Expand source code
async def sync_all(
    self, fidelity_bond_addresses: list[tuple[str, int, int]] | None = None
) -> dict[int, list[UTXOInfo]]:
    """
    Sync all mixdepths, optionally including fidelity bond addresses.

    Args:
        fidelity_bond_addresses: Optional list of (address, locktime, index) tuples
                                for fidelity bonds to scan with wallet descriptors

    Returns:
        Dictionary mapping mixdepth to list of UTXOs
    """
    logger.info("Syncing all mixdepths...")

    # Try efficient descriptor-based sync if backend supports it
    if hasattr(self.backend, "scan_descriptors"):
        result = await self._sync_all_with_descriptors(fidelity_bond_addresses)
        if result is not None:
            return result
        # Fall back to address-by-address sync on failure
        logger.warning("Descriptor scan failed, falling back to address scan")

    # Legacy address-by-address scanning
    result = {}
    for mixdepth in range(self.mixdepth_count):
        utxos = await self.sync_mixdepth(mixdepth)
        result[mixdepth] = utxos
    logger.info(f"Sync complete: {sum(len(u) for u in result.values())} total UTXOs")
    return result

Sync all mixdepths, optionally including fidelity bond addresses.

Args

fidelity_bond_addresses
Optional list of (address, locktime, index) tuples for fidelity bonds to scan with wallet descriptors

Returns

Dictionary mapping mixdepth to list of UTXOs

async def sync_fidelity_bonds(self, locktimes: list[int]) ‑> list[UTXOInfo]
Expand source code
async def sync_fidelity_bonds(self, locktimes: list[int]) -> list[UTXOInfo]:
    """
    Sync fidelity bond UTXOs with specific locktimes.

    Fidelity bonds use mixdepth 0, branch 2, with path format:
    m/84'/coin'/0'/2/index:locktime

    Args:
        locktimes: List of Unix timestamps to scan for

    Returns:
        List of fidelity bond UTXOs found
    """
    utxos: list[UTXOInfo] = []

    if not locktimes:
        logger.debug("No locktimes provided for fidelity bond sync")
        return utxos

    for locktime in locktimes:
        consecutive_empty = 0
        index = 0

        while consecutive_empty < self.gap_limit:
            # Generate addresses for this locktime
            addresses = []
            for i in range(self.gap_limit):
                address = self.get_fidelity_bond_address(index + i, locktime)
                addresses.append(address)

            # Fetch UTXOs
            backend_utxos = await self.backend.get_utxos(addresses)

            # Group by address
            utxos_by_address: dict[str, list] = {addr: [] for addr in addresses}
            for utxo in backend_utxos:
                if utxo.address in utxos_by_address:
                    utxos_by_address[utxo.address].append(utxo)

            # Process results
            for i, address in enumerate(addresses):
                addr_utxos = utxos_by_address[address]

                if addr_utxos:
                    consecutive_empty = 0
                    for utxo in addr_utxos:
                        # Path includes locktime notation
                        path = (
                            f"{self.root_path}/0'/{FIDELITY_BOND_BRANCH}/{index + i}:{locktime}"
                        )
                        utxo_info = UTXOInfo(
                            txid=utxo.txid,
                            vout=utxo.vout,
                            value=utxo.value,
                            address=address,
                            confirmations=utxo.confirmations,
                            scriptpubkey=utxo.scriptpubkey,
                            path=path,
                            mixdepth=0,  # Fidelity bonds always in mixdepth 0
                            height=utxo.height,
                            locktime=locktime,  # Store locktime for P2WSH signing
                        )
                        utxos.append(utxo_info)
                        logger.info(
                            f"Found fidelity bond UTXO: {utxo.txid}:{utxo.vout} "
                            f"value={utxo.value} locktime={locktime}"
                        )
                else:
                    consecutive_empty += 1

                if consecutive_empty >= self.gap_limit:
                    break

            index += self.gap_limit

    # Add fidelity bond UTXOs to mixdepth 0 cache
    if utxos:
        if 0 not in self.utxo_cache:
            self.utxo_cache[0] = []
        self.utxo_cache[0].extend(utxos)
        logger.info(f"Found {len(utxos)} fidelity bond UTXOs")

    return utxos

Sync fidelity bond UTXOs with specific locktimes.

Fidelity bonds use mixdepth 0, branch 2, with path format: m/84'/coin'/0'/2/index:locktime

Args

locktimes
List of Unix timestamps to scan for

Returns

List of fidelity bond UTXOs found

async def sync_mixdepth(self, mixdepth: int) ‑> list[UTXOInfo]
Expand source code
async def sync_mixdepth(self, mixdepth: int) -> list[UTXOInfo]:
    """
    Sync a mixdepth with the blockchain.
    Scans addresses up to gap limit.
    """
    utxos: list[UTXOInfo] = []

    for change in [0, 1]:
        consecutive_empty = 0
        index = 0

        while consecutive_empty < self.gap_limit:
            # Scan in batches of gap_limit size for performance
            batch_size = self.gap_limit
            addresses = []

            for i in range(batch_size):
                address = self.get_address(mixdepth, change, index + i)
                addresses.append(address)

            # Fetch UTXOs for the whole batch
            backend_utxos = await self.backend.get_utxos(addresses)

            # Group results by address
            utxos_by_address: dict[str, list] = {addr: [] for addr in addresses}
            for utxo in backend_utxos:
                if utxo.address in utxos_by_address:
                    utxos_by_address[utxo.address].append(utxo)

            # Process batch results in order
            for i, address in enumerate(addresses):
                addr_utxos = utxos_by_address[address]

                if addr_utxos:
                    consecutive_empty = 0
                    for utxo in addr_utxos:
                        path = f"{self.root_path}/{mixdepth}'/{change}/{index + i}"
                        utxo_info = UTXOInfo(
                            txid=utxo.txid,
                            vout=utxo.vout,
                            value=utxo.value,
                            address=address,
                            confirmations=utxo.confirmations,
                            scriptpubkey=utxo.scriptpubkey,
                            path=path,
                            mixdepth=mixdepth,
                            height=utxo.height,
                        )
                        utxos.append(utxo_info)
                else:
                    consecutive_empty += 1

                if consecutive_empty >= self.gap_limit:
                    break

            index += batch_size

        logger.debug(
            f"Synced mixdepth {mixdepth} change {change}: "
            f"scanned ~{index} addresses, found "
            f"{len([u for u in utxos if u.path.split('/')[-2] == str(change)])} UTXOs"
        )

    self.utxo_cache[mixdepth] = utxos
    return utxos

Sync a mixdepth with the blockchain. Scans addresses up to gap limit.