Module jmwallet.cli
JoinMarket Wallet CLI - Manage wallets, generate addresses, and handle fidelity bonds.
Functions
def decrypt_mnemonic(encrypted_data: bytes, password: str) ‑> str-
Expand source code
def decrypt_mnemonic(encrypted_data: bytes, password: str) -> str: """ Decrypt a mnemonic with a password. Args: encrypted_data: The encrypted bytes (salt + Fernet token) password: The password for decryption Returns: The decrypted mnemonic phrase Raises: ValueError: If decryption fails (wrong password or corrupted data) """ from cryptography.fernet import Fernet, InvalidToken from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC if len(encrypted_data) < 16: raise ValueError("Invalid encrypted data") # Extract salt and encrypted token salt = encrypted_data[:16] encrypted_token = encrypted_data[16:] # Derive key from password kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=600_000, ) key = base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8"))) # Decrypt fernet = Fernet(key) try: decrypted = fernet.decrypt(encrypted_token) return decrypted.decode("utf-8") except InvalidToken as e: raise ValueError("Decryption failed - wrong password or corrupted file") from eDecrypt a mnemonic with a password.
Args
encrypted_data- The encrypted bytes (salt + Fernet token)
password- The password for decryption
Returns
The decrypted mnemonic phrase
Raises
ValueError- If decryption fails (wrong password or corrupted data)
def encrypt_mnemonic(mnemonic: str, password: str) ‑> bytes-
Expand source code
def encrypt_mnemonic(mnemonic: str, password: str) -> bytes: """ Encrypt a mnemonic with a password using Fernet (AES-128-CBC). Uses PBKDF2 to derive a key from the password. Args: mnemonic: The mnemonic phrase to encrypt password: The password for encryption Returns: Encrypted bytes (base64-encoded internally by Fernet) """ from cryptography.fernet import Fernet from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC # Generate a random salt salt = os.urandom(16) # Derive a key from password using PBKDF2 kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=600_000, # High iteration count for security ) key = base64.urlsafe_b64encode(kdf.derive(password.encode("utf-8"))) # Encrypt the mnemonic fernet = Fernet(key) encrypted = fernet.encrypt(mnemonic.encode("utf-8")) # Prepend salt to encrypted data return salt + encryptedEncrypt a mnemonic with a password using Fernet (AES-128-CBC).
Uses PBKDF2 to derive a key from the password.
Args
mnemonic- The mnemonic phrase to encrypt
password- The password for encryption
Returns
Encrypted bytes (base64-encoded internally by Fernet)
def generate(word_count: "Annotated[int, typer.Option('--words', '-w', help='Number of words (12, 15, 18, 21, or 24)')]" = 24,
save: "Annotated[bool, typer.Option('--save', '-s', help='Save to file')]" = False,
output_file: "Annotated[Path | None, typer.Option('--output', '-o', help='Output file path')]" = None,
password: "Annotated[str | None, typer.Option('--password', '-p', help='Password for encryption')]" = None,
prompt_password: "Annotated[bool, typer.Option('--prompt-password', help='Prompt for password interactively')]" = False) ‑> None-
Expand source code
@app.command() def generate( word_count: Annotated[ int, typer.Option("--words", "-w", help="Number of words (12, 15, 18, 21, or 24)") ] = 24, save: Annotated[bool, typer.Option("--save", "-s", help="Save to file")] = False, output_file: Annotated[ Path | None, typer.Option("--output", "-o", help="Output file path") ] = None, password: Annotated[ str | None, typer.Option("--password", "-p", help="Password for encryption") ] = None, prompt_password: Annotated[ bool, typer.Option("--prompt-password", help="Prompt for password interactively") ] = False, ) -> None: """Generate a new BIP39 mnemonic phrase with secure entropy.""" setup_logging() try: mnemonic = generate_mnemonic_secure(word_count) # Validate the generated mnemonic if not validate_mnemonic(mnemonic): logger.error("Generated mnemonic failed validation - this should not happen") raise typer.Exit(1) if save: if output_file is None: output_file = Path.home() / ".joinmarket-ng" / "wallets" / "default.mnemonic" # Prompt for password if requested if prompt_password: password = typer.prompt("Enter encryption password", hide_input=True) confirm = typer.prompt("Confirm password", hide_input=True) if password != confirm: logger.error("Passwords do not match") raise typer.Exit(1) save_mnemonic_file(mnemonic, output_file, password) typer.echo(f"\nMnemonic saved to: {output_file}") if password: typer.echo("File is encrypted - you will need the password to use it.") else: typer.echo("WARNING: File is NOT encrypted - consider using --password") typer.echo("KEEP THIS FILE SECURE - IT CONTROLS YOUR FUNDS!") else: typer.echo("\n" + "=" * 80) typer.echo("GENERATED MNEMONIC - WRITE THIS DOWN AND KEEP IT SAFE!") typer.echo("=" * 80) typer.echo(f"\n{mnemonic}\n") typer.echo("=" * 80) typer.echo("\nThis mnemonic controls your Bitcoin funds.") typer.echo("Anyone with this phrase can spend your coins.") typer.echo("Store it securely offline - NEVER share it with anyone!") typer.echo("=" * 80 + "\n") except ValueError as e: logger.error(f"Failed to generate mnemonic: {e}") raise typer.Exit(1) except Exception as e: logger.error(f"Unexpected error: {e}") raise typer.Exit(1)Generate a new BIP39 mnemonic phrase with secure entropy.
def generate_bond_address(mnemonic: "Annotated[str | None, typer.Option('--mnemonic')]" = None,
mnemonic_file: "Annotated[Path | None, typer.Option('--mnemonic-file', '-f')]" = None,
password: "Annotated[str | None, typer.Option('--password', '-p')]" = None,
locktime: "Annotated[int, typer.Option('--locktime', '-L', help='Locktime as Unix timestamp')]" = 0,
locktime_date: "Annotated[str | None, typer.Option('--locktime-date', '-d', help='Locktime as date (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)')]" = None,
index: "Annotated[int, typer.Option('--index', '-i', help='Address index')]" = 0,
network: "Annotated[str, typer.Option('--network', '-n')]" = 'mainnet',
data_dir: "Annotated[Path | None, typer.Option('--data-dir', help='Data directory (default: ~/.joinmarket-ng or $JOINMARKET_DATA_DIR)')]" = None,
no_save: "Annotated[bool, typer.Option('--no-save', help='Do not save the bond to the registry')]" = False,
log_level: "Annotated[str, typer.Option('--log-level', '-l')]" = 'INFO') ‑> None-
Expand source code
@app.command("generate-bond-address") def generate_bond_address( mnemonic: Annotated[str | None, typer.Option("--mnemonic")] = None, mnemonic_file: Annotated[Path | None, typer.Option("--mnemonic-file", "-f")] = None, password: Annotated[str | None, typer.Option("--password", "-p")] = None, locktime: Annotated[ int, typer.Option("--locktime", "-L", help="Locktime as Unix timestamp") ] = 0, locktime_date: Annotated[ str | None, typer.Option( "--locktime-date", "-d", help="Locktime as date (YYYY-MM-DD or YYYY-MM-DD HH:MM:SS)" ), ] = None, index: Annotated[int, typer.Option("--index", "-i", help="Address index")] = 0, network: Annotated[str, typer.Option("--network", "-n")] = "mainnet", data_dir: Annotated[ Path | None, typer.Option( "--data-dir", help="Data directory (default: ~/.joinmarket-ng or $JOINMARKET_DATA_DIR)", ), ] = None, no_save: Annotated[ bool, typer.Option("--no-save", help="Do not save the bond to the registry"), ] = False, log_level: Annotated[str, typer.Option("--log-level", "-l")] = "INFO", ) -> None: """Generate a fidelity bond (timelocked P2WSH) address.""" setup_logging(log_level) try: resolved_mnemonic = _resolve_mnemonic(mnemonic, mnemonic_file, password, True) except (FileNotFoundError, ValueError) as e: logger.error(str(e)) raise typer.Exit(1) # Parse locktime if locktime_date: try: # Try full datetime format first try: dt = datetime.strptime(locktime_date, "%Y-%m-%d %H:%M:%S") except ValueError: # Try date-only format dt = datetime.strptime(locktime_date, "%Y-%m-%d") locktime = int(dt.timestamp()) except ValueError: logger.error(f"Invalid date format: {locktime_date}") logger.info("Use format: YYYY-MM-DD or YYYY-MM-DD HH:MM:SS") raise typer.Exit(1) if locktime <= 0: logger.error("Locktime is required. Use --locktime or --locktime-date") raise typer.Exit(1) # Validate locktime is in the future if locktime <= datetime.now().timestamp(): logger.warning("Locktime is in the past - the bond will be immediately spendable") from jmcore.btc_script import disassemble_script, mk_freeze_script from jmcore.paths import get_default_data_dir from jmwallet.wallet.address import script_to_p2wsh_address from jmwallet.wallet.bip32 import HDKey, mnemonic_to_seed from jmwallet.wallet.bond_registry import ( create_bond_info, load_registry, save_registry, ) from jmwallet.wallet.service import FIDELITY_BOND_BRANCH seed = mnemonic_to_seed(resolved_mnemonic) master_key = HDKey.from_seed(seed) coin_type = 0 if network == "mainnet" else 1 path = f"m/84'/{coin_type}'/0'/{FIDELITY_BOND_BRANCH}/{index}" key = master_key.derive(path) pubkey_hex = key.get_public_key_bytes(compressed=True).hex() witness_script = mk_freeze_script(pubkey_hex, locktime) address = script_to_p2wsh_address(witness_script, network) locktime_dt = datetime.fromtimestamp(locktime) disassembled = disassemble_script(witness_script) # Resolve data directory resolved_data_dir = data_dir if data_dir else get_default_data_dir() # Save to registry unless --no-save saved = False existing = False if not no_save: registry = load_registry(resolved_data_dir) existing_bond = registry.get_bond_by_address(address) if existing_bond: existing = True logger.info(f"Bond already exists in registry (created: {existing_bond.created_at})") else: bond_info = create_bond_info( address=address, locktime=locktime, index=index, path=path, pubkey_hex=pubkey_hex, witness_script=witness_script, network=network, ) registry.add_bond(bond_info) save_registry(registry, resolved_data_dir) saved = True print("\n" + "=" * 80) print("FIDELITY BOND ADDRESS") print("=" * 80) print(f"\nAddress: {address}") print(f"Locktime: {locktime} ({locktime_dt.strftime('%Y-%m-%d %H:%M:%S')})") print(f"Index: {index}") print(f"Network: {network}") print(f"Path: {path}") print() print("-" * 80) print("WITNESS SCRIPT (redeemScript)") print("-" * 80) print(f"Hex: {witness_script.hex()}") print(f"Disassembled: {disassembled}") print("-" * 80) if saved: print(f"\nSaved to registry: {resolved_data_dir / 'fidelity_bonds.json'}") elif existing: print("\nBond already in registry (not updated)") elif no_save: print("\nNot saved to registry (--no-save)") print("\n" + "=" * 80) print("IMPORTANT: Funds sent to this address are LOCKED until the locktime!") print(" Make sure you have backed up your mnemonic.") print("=" * 80 + "\n")Generate a fidelity bond (timelocked P2WSH) address.
def generate_mnemonic_secure(word_count: int = 24) ‑> str-
Expand source code
def generate_mnemonic_secure(word_count: int = 24) -> str: """ Generate a BIP39 mnemonic from secure entropy. Args: word_count: Number of words (12, 15, 18, 21, or 24) Returns: BIP39 mnemonic phrase with valid checksum """ from mnemonic import Mnemonic if word_count not in (12, 15, 18, 21, 24): raise ValueError("word_count must be 12, 15, 18, 21, or 24") # Calculate entropy bits: 12 words = 128 bits, 24 words = 256 bits # Formula: word_count * 11 = entropy_bits + checksum_bits # checksum_bits = entropy_bits / 32 # So: word_count * 11 = entropy_bits * (1 + 1/32) = entropy_bits * 33/32 # entropy_bits = word_count * 11 * 32 / 33 entropy_bits = {12: 128, 15: 160, 18: 192, 21: 224, 24: 256}[word_count] m = Mnemonic("english") return m.generate(strength=entropy_bits)Generate a BIP39 mnemonic from secure entropy.
Args
word_count- Number of words (12, 15, 18, 21, or 24)
Returns
BIP39 mnemonic phrase with valid checksum
def history(limit: "Annotated[int | None, typer.Option('--limit', '-n', help='Max entries to show')]" = None,
role: "Annotated[str | None, typer.Option('--role', '-r', help='Filter by role (maker/taker)')]" = None,
stats: "Annotated[bool, typer.Option('--stats', '-s', help='Show statistics only')]" = False,
csv_output: "Annotated[bool, typer.Option('--csv', help='Output as CSV')]" = False,
data_dir: "Annotated[Path | None, typer.Option('--data-dir', help='Data directory (default: ~/.joinmarket-ng or $JOINMARKET_DATA_DIR)')]" = None) ‑> None-
Expand source code
@app.command() def history( limit: Annotated[int | None, typer.Option("--limit", "-n", help="Max entries to show")] = None, role: Annotated[ str | None, typer.Option("--role", "-r", help="Filter by role (maker/taker)") ] = None, stats: Annotated[bool, typer.Option("--stats", "-s", help="Show statistics only")] = False, csv_output: Annotated[bool, typer.Option("--csv", help="Output as CSV")] = False, data_dir: Annotated[ Path | None, typer.Option( "--data-dir", help="Data directory (default: ~/.joinmarket-ng or $JOINMARKET_DATA_DIR)", ), ] = None, ) -> None: """View CoinJoin transaction history.""" from jmwallet.history import get_history_stats, read_history role_filter: Literal["maker", "taker"] | None = None if role: if role.lower() not in ("maker", "taker"): logger.error("Role must be 'maker' or 'taker'") raise typer.Exit(1) role_filter = role.lower() # type: ignore[assignment] if stats: stats_data = get_history_stats(data_dir) print("\n" + "=" * 60) print("COINJOIN HISTORY STATISTICS") print("=" * 60) print(f"Total CoinJoins: {stats_data['total_coinjoins']}") print(f" As Maker: {stats_data['maker_coinjoins']}") print(f" As Taker: {stats_data['taker_coinjoins']}") print(f"Success Rate: {stats_data['success_rate']:.1f}%") print(f"Total Volume: {stats_data['total_volume']:,} sats") print(f"Total Fees Earned: {stats_data['total_fees_earned']:,} sats") print(f"Total Fees Paid: {stats_data['total_fees_paid']:,} sats") print("=" * 60 + "\n") return entries = read_history(data_dir, limit, role_filter) if not entries: print("\nNo CoinJoin history found.") return if csv_output: import csv as csv_module import sys fieldnames = [ "timestamp", "role", "txid", "cj_amount", "peer_count", "net_fee", "success", ] writer = csv_module.DictWriter(sys.stdout, fieldnames=fieldnames) writer.writeheader() for entry in entries: writer.writerow( { "timestamp": entry.timestamp, "role": entry.role, "txid": entry.txid, "cj_amount": entry.cj_amount, "peer_count": entry.peer_count if entry.peer_count is not None else "", "net_fee": entry.net_fee, "success": entry.success, } ) else: print(f"\nCoinJoin History ({len(entries)} entries):") print("=" * 140) header = f"{'Timestamp':<20} {'Role':<7} {'Amount':>12} {'Peers':>6}" header += f" {'Net Fee':>12} {'TXID':<64}" print(header) print("-" * 140) for entry in entries: status = "" if entry.success else " [FAILED]" txid_full = entry.txid if entry.txid else "N/A" fee_str = f"{entry.net_fee:+,}" if entry.net_fee != 0 else "0" peer_str = str(entry.peer_count) if entry.peer_count is not None else "?" print( f"{entry.timestamp[:19]:<20} {entry.role:<7} {entry.cj_amount:>12,} " f"{peer_str:>6} {fee_str:>12} {txid_full:<64}{status}" ) print("=" * 140)View CoinJoin transaction history.
def info(mnemonic: "Annotated[str | None, typer.Option('--mnemonic', help='BIP39 mnemonic')]" = None,
mnemonic_file: "Annotated[Path | None, typer.Option('--mnemonic-file', '-f', help='Path to mnemonic file')]" = None,
password: "Annotated[str | None, typer.Option('--password', '-p', help='Password for encrypted file')]" = None,
network: "Annotated[str, typer.Option('--network', '-n', help='Bitcoin network')]" = 'mainnet',
backend_type: "Annotated[str, typer.Option('--backend', '-b', help='Backend: full_node | neutrino')]" = 'full_node',
rpc_url: "Annotated[str, typer.Option('--rpc-url', envvar='BITCOIN_RPC_URL')]" = 'http://127.0.0.1:8332',
rpc_user: "Annotated[str, typer.Option('--rpc-user', envvar='BITCOIN_RPC_USER')]" = '',
rpc_password: "Annotated[str, typer.Option('--rpc-password', envvar='BITCOIN_RPC_PASSWORD')]" = '',
neutrino_url: "Annotated[str, typer.Option('--neutrino-url', envvar='NEUTRINO_URL')]" = 'http://127.0.0.1:8334',
log_level: "Annotated[str, typer.Option('--log-level', '-l')]" = 'INFO') ‑> None-
Expand source code
@app.command() def info( mnemonic: Annotated[str | None, typer.Option("--mnemonic", help="BIP39 mnemonic")] = None, mnemonic_file: Annotated[ Path | None, typer.Option("--mnemonic-file", "-f", help="Path to mnemonic file") ] = None, password: Annotated[ str | None, typer.Option("--password", "-p", help="Password for encrypted file") ] = None, network: Annotated[str, typer.Option("--network", "-n", help="Bitcoin network")] = "mainnet", backend_type: Annotated[ str, typer.Option("--backend", "-b", help="Backend: full_node | neutrino") ] = "full_node", rpc_url: Annotated[ str, typer.Option("--rpc-url", envvar="BITCOIN_RPC_URL") ] = "http://127.0.0.1:8332", rpc_user: Annotated[str, typer.Option("--rpc-user", envvar="BITCOIN_RPC_USER")] = "", rpc_password: Annotated[ str, typer.Option("--rpc-password", envvar="BITCOIN_RPC_PASSWORD") ] = "", neutrino_url: Annotated[ str, typer.Option("--neutrino-url", envvar="NEUTRINO_URL") ] = "http://127.0.0.1:8334", log_level: Annotated[str, typer.Option("--log-level", "-l")] = "INFO", ) -> None: """Display wallet information and balances by mixdepth.""" setup_logging(log_level) try: resolved_mnemonic = _resolve_mnemonic(mnemonic, mnemonic_file, password, True) except (FileNotFoundError, ValueError) as e: logger.error(str(e)) raise typer.Exit(1) asyncio.run( _show_wallet_info( resolved_mnemonic, network, backend_type, rpc_url, rpc_user, rpc_password, neutrino_url ) )Display wallet information and balances by mixdepth.
def list_bonds(mnemonic: "Annotated[str | None, typer.Option('--mnemonic')]" = None,
mnemonic_file: "Annotated[Path | None, typer.Option('--mnemonic-file', '-f')]" = None,
password: "Annotated[str | None, typer.Option('--password', '-p')]" = None,
network: "Annotated[str, typer.Option('--network', '-n')]" = 'mainnet',
backend_type: "Annotated[str, typer.Option('--backend', '-b')]" = 'full_node',
rpc_url: "Annotated[str, typer.Option('--rpc-url', envvar='BITCOIN_RPC_URL')]" = 'http://127.0.0.1:8332',
rpc_user: "Annotated[str, typer.Option('--rpc-user', envvar='BITCOIN_RPC_USER')]" = '',
rpc_password: "Annotated[str, typer.Option('--rpc-password', envvar='BITCOIN_RPC_PASSWORD')]" = '',
locktimes: "Annotated[list[int] | None, typer.Option('--locktime', '-L', help='Locktime(s) to scan for')]" = None,
log_level: "Annotated[str, typer.Option('--log-level', '-l')]" = 'INFO') ‑> None-
Expand source code
@app.command() def list_bonds( mnemonic: Annotated[str | None, typer.Option("--mnemonic")] = None, mnemonic_file: Annotated[Path | None, typer.Option("--mnemonic-file", "-f")] = None, password: Annotated[str | None, typer.Option("--password", "-p")] = None, network: Annotated[str, typer.Option("--network", "-n")] = "mainnet", backend_type: Annotated[str, typer.Option("--backend", "-b")] = "full_node", rpc_url: Annotated[ str, typer.Option("--rpc-url", envvar="BITCOIN_RPC_URL") ] = "http://127.0.0.1:8332", rpc_user: Annotated[str, typer.Option("--rpc-user", envvar="BITCOIN_RPC_USER")] = "", rpc_password: Annotated[ str, typer.Option("--rpc-password", envvar="BITCOIN_RPC_PASSWORD") ] = "", locktimes: Annotated[ list[int] | None, typer.Option("--locktime", "-L", help="Locktime(s) to scan for") ] = None, log_level: Annotated[str, typer.Option("--log-level", "-l")] = "INFO", ) -> None: """List all fidelity bonds in the wallet.""" setup_logging(log_level) try: resolved_mnemonic = _resolve_mnemonic(mnemonic, mnemonic_file, password, True) except (FileNotFoundError, ValueError) as e: logger.error(str(e)) raise typer.Exit(1) asyncio.run( _list_fidelity_bonds( resolved_mnemonic, network, backend_type, rpc_url, rpc_user, rpc_password, locktimes or [], ) )List all fidelity bonds in the wallet.
def load_mnemonic_file(mnemonic_file: Path, password: str | None = None) ‑> str-
Expand source code
def load_mnemonic_file( mnemonic_file: Path, password: str | None = None, ) -> str: """ Load a mnemonic from a file, decrypting if necessary. Args: mnemonic_file: Path to the mnemonic file password: Password for decryption (required if file is encrypted) Returns: The mnemonic phrase Raises: ValueError: If file is encrypted but no password provided """ if not mnemonic_file.exists(): raise FileNotFoundError(f"Mnemonic file not found: {mnemonic_file}") data = mnemonic_file.read_bytes() # Try to detect if file is encrypted # Encrypted files start with 16-byte salt + Fernet token # Plaintext files are ASCII only try: text = data.decode("utf-8") # Check if it looks like a valid mnemonic (words separated by spaces) words = text.strip().split() if len(words) in (12, 15, 18, 21, 24) and all(w.isalpha() for w in words): return text.strip() except UnicodeDecodeError: pass # File appears to be encrypted if not password: raise ValueError( "Mnemonic file appears to be encrypted. Please provide a password with --password" ) return decrypt_mnemonic(data, password)Load a mnemonic from a file, decrypting if necessary.
Args
mnemonic_file- Path to the mnemonic file
password- Password for decryption (required if file is encrypted)
Returns
The mnemonic phrase
Raises
ValueError- If file is encrypted but no password provided
def main() ‑> None-
Expand source code
def main() -> None: """CLI entry point.""" app()CLI entry point.
def registry_list(data_dir: "Annotated[Path | None, typer.Option('--data-dir', help='Data directory (default: ~/.joinmarket-ng or $JOINMARKET_DATA_DIR)')]" = None,
funded_only: "Annotated[bool, typer.Option('--funded-only', '-f', help='Show only funded bonds')]" = False,
active_only: "Annotated[bool, typer.Option('--active-only', '-a', help='Show only active (funded & not expired) bonds')]" = False,
json_output: "Annotated[bool, typer.Option('--json', '-j', help='Output as JSON')]" = False,
log_level: "Annotated[str, typer.Option('--log-level', '-l')]" = 'WARNING') ‑> None-
Expand source code
@app.command("registry-list") def registry_list( data_dir: Annotated[ Path | None, typer.Option( "--data-dir", help="Data directory (default: ~/.joinmarket-ng or $JOINMARKET_DATA_DIR)", ), ] = None, funded_only: Annotated[ bool, typer.Option("--funded-only", "-f", help="Show only funded bonds"), ] = False, active_only: Annotated[ bool, typer.Option("--active-only", "-a", help="Show only active (funded & not expired) bonds"), ] = False, json_output: Annotated[ bool, typer.Option("--json", "-j", help="Output as JSON"), ] = False, log_level: Annotated[str, typer.Option("--log-level", "-l")] = "WARNING", ) -> None: """List all fidelity bonds in the registry.""" setup_logging(log_level) from jmcore.paths import get_default_data_dir from jmwallet.wallet.bond_registry import load_registry resolved_data_dir = data_dir if data_dir else get_default_data_dir() registry = load_registry(resolved_data_dir) if active_only: bonds = registry.get_active_bonds() elif funded_only: bonds = registry.get_funded_bonds() else: bonds = registry.bonds if json_output: import json output = [bond.model_dump() for bond in bonds] print(json.dumps(output, indent=2)) return if not bonds: print("\nNo fidelity bonds found in registry.") print(f"Registry: {resolved_data_dir / 'fidelity_bonds.json'}") return print(f"\nFidelity Bonds ({len(bonds)} total)") print("=" * 120) header = f"{'Address':<64} {'Locktime':<20} {'Status':<15} {'Value':>15} {'Index':>6}" print(header) print("-" * 120) for bond in bonds: # Status if bond.is_funded and not bond.is_expired: status = "ACTIVE" elif bond.is_funded and bond.is_expired: status = "EXPIRED (funded)" elif bond.is_expired: status = "EXPIRED" else: status = "UNFUNDED" # Value value_str = f"{bond.value:,} sats" if bond.value else "-" print( f"{bond.address:<64} {bond.locktime_human:<20} {status:<15} " f"{value_str:>15} {bond.index:>6}" ) print("=" * 120) # Show best bond if any active best = registry.get_best_bond() if best: print(f"\nBest bond for advertising: {best.address[:20]}...{best.address[-8:]}") print(f" Value: {best.value:,} sats, Unlock in: {best.time_until_unlock:,}s")List all fidelity bonds in the registry.
def registry_show(address: "Annotated[str, typer.Argument(help='Bond address to show')]",
data_dir: "Annotated[Path | None, typer.Option('--data-dir', help='Data directory (default: ~/.joinmarket-ng or $JOINMARKET_DATA_DIR)')]" = None,
json_output: "Annotated[bool, typer.Option('--json', '-j', help='Output as JSON')]" = False,
log_level: "Annotated[str, typer.Option('--log-level', '-l')]" = 'WARNING') ‑> None-
Expand source code
@app.command("registry-show") def registry_show( address: Annotated[str, typer.Argument(help="Bond address to show")], data_dir: Annotated[ Path | None, typer.Option( "--data-dir", help="Data directory (default: ~/.joinmarket-ng or $JOINMARKET_DATA_DIR)", ), ] = None, json_output: Annotated[ bool, typer.Option("--json", "-j", help="Output as JSON"), ] = False, log_level: Annotated[str, typer.Option("--log-level", "-l")] = "WARNING", ) -> None: """Show detailed information about a specific fidelity bond.""" setup_logging(log_level) from jmcore.btc_script import disassemble_script from jmcore.paths import get_default_data_dir from jmwallet.wallet.bond_registry import load_registry resolved_data_dir = data_dir if data_dir else get_default_data_dir() registry = load_registry(resolved_data_dir) bond = registry.get_bond_by_address(address) if not bond: print(f"\nBond not found: {address}") print(f"Registry: {resolved_data_dir / 'fidelity_bonds.json'}") raise typer.Exit(1) if json_output: import json print(json.dumps(bond.model_dump(), indent=2)) return print("\n" + "=" * 80) print("FIDELITY BOND DETAILS") print("=" * 80) print(f"\nAddress: {bond.address}") print(f"Network: {bond.network}") print(f"Index: {bond.index}") print(f"Path: {bond.path}") print(f"Public Key: {bond.pubkey}") print() print(f"Locktime: {bond.locktime} ({bond.locktime_human})") if bond.is_expired: print("Status: EXPIRED (can be spent)") else: remaining = bond.time_until_unlock days = remaining // 86400 hours = (remaining % 86400) // 3600 print(f"Status: LOCKED ({days}d {hours}h remaining)") print() print("-" * 80) print("WITNESS SCRIPT") print("-" * 80) witness_script = bytes.fromhex(bond.witness_script_hex) print(f"Hex: {bond.witness_script_hex}") print(f"Disassembled: {disassemble_script(witness_script)}") print() print("-" * 80) print("FUNDING STATUS") print("-" * 80) if bond.is_funded: print(f"TXID: {bond.txid}") print(f"Vout: {bond.vout}") print(f"Value: {bond.value:,} sats") print(f"Confirmations: {bond.confirmations}") else: print("Not funded (or not yet synced)") print() print(f"Created: {bond.created_at}") print("=" * 80 + "\n")Show detailed information about a specific fidelity bond.
def registry_sync(mnemonic: "Annotated[str | None, typer.Option('--mnemonic')]" = None,
mnemonic_file: "Annotated[Path | None, typer.Option('--mnemonic-file', '-f')]" = None,
password: "Annotated[str | None, typer.Option('--password', '-p')]" = None,
network: "Annotated[str, typer.Option('--network', '-n')]" = 'mainnet',
rpc_url: "Annotated[str, typer.Option('--rpc-url', envvar='BITCOIN_RPC_URL')]" = 'http://127.0.0.1:8332',
rpc_user: "Annotated[str, typer.Option('--rpc-user', envvar='BITCOIN_RPC_USER')]" = '',
rpc_password: "Annotated[str, typer.Option('--rpc-password', envvar='BITCOIN_RPC_PASSWORD')]" = '',
data_dir: "Annotated[Path | None, typer.Option('--data-dir', help='Data directory (default: ~/.joinmarket-ng or $JOINMARKET_DATA_DIR)')]" = None,
log_level: "Annotated[str, typer.Option('--log-level', '-l')]" = 'INFO') ‑> None-
Expand source code
@app.command("registry-sync") def registry_sync( mnemonic: Annotated[str | None, typer.Option("--mnemonic")] = None, mnemonic_file: Annotated[Path | None, typer.Option("--mnemonic-file", "-f")] = None, password: Annotated[str | None, typer.Option("--password", "-p")] = None, network: Annotated[str, typer.Option("--network", "-n")] = "mainnet", rpc_url: Annotated[ str, typer.Option("--rpc-url", envvar="BITCOIN_RPC_URL") ] = "http://127.0.0.1:8332", rpc_user: Annotated[str, typer.Option("--rpc-user", envvar="BITCOIN_RPC_USER")] = "", rpc_password: Annotated[ str, typer.Option("--rpc-password", envvar="BITCOIN_RPC_PASSWORD") ] = "", data_dir: Annotated[ Path | None, typer.Option( "--data-dir", help="Data directory (default: ~/.joinmarket-ng or $JOINMARKET_DATA_DIR)", ), ] = None, log_level: Annotated[str, typer.Option("--log-level", "-l")] = "INFO", ) -> None: """Sync fidelity bond funding status from the blockchain.""" setup_logging(log_level) try: resolved_mnemonic = _resolve_mnemonic(mnemonic, mnemonic_file, password, True) except (FileNotFoundError, ValueError) as e: logger.error(str(e)) raise typer.Exit(1) from jmcore.paths import get_default_data_dir from jmwallet.wallet.bond_registry import load_registry resolved_data_dir = data_dir if data_dir else get_default_data_dir() registry = load_registry(resolved_data_dir) if not registry.bonds: print("\nNo bonds in registry to sync.") print("Use 'generate-bond-address' to create bonds first.") raise typer.Exit(0) asyncio.run( _sync_bonds_async( registry, resolved_mnemonic, network, rpc_url, rpc_user, rpc_password, resolved_data_dir, ) )Sync fidelity bond funding status from the blockchain.
def save_mnemonic_file(mnemonic: str, output_file: Path, password: str | None = None) ‑> None-
Expand source code
def save_mnemonic_file( mnemonic: str, output_file: Path, password: str | None = None, ) -> None: """ Save a mnemonic to a file, optionally encrypted. Args: mnemonic: The mnemonic phrase to save output_file: The output file path password: Optional password for encryption """ output_file.parent.mkdir(parents=True, exist_ok=True) if password: encrypted = encrypt_mnemonic(mnemonic, password) output_file.write_bytes(encrypted) os.chmod(output_file, 0o600) logger.info(f"Encrypted mnemonic saved to {output_file}") else: output_file.write_text(mnemonic) os.chmod(output_file, 0o600) logger.warning(f"Mnemonic saved to {output_file} (PLAINTEXT - consider using --password)")Save a mnemonic to a file, optionally encrypted.
Args
mnemonic- The mnemonic phrase to save
output_file- The output file path
password- Optional password for encryption
def send(destination: "Annotated[str, typer.Argument(help='Destination address')]",
amount: "Annotated[int, typer.Option('--amount', '-a', help='Amount in sats (0 for sweep)')]" = 0,
mnemonic: "Annotated[str | None, typer.Option('--mnemonic')]" = None,
mnemonic_file: "Annotated[Path | None, typer.Option('--mnemonic-file', '-f')]" = None,
password: "Annotated[str | None, typer.Option('--password', '-p')]" = None,
mixdepth: "Annotated[int, typer.Option('--mixdepth', '-m', help='Source mixdepth')]" = 0,
fee_rate: "Annotated[int, typer.Option('--fee-rate', help='Fee rate in sat/vB')]" = 10,
network: "Annotated[str, typer.Option('--network', '-n')]" = 'mainnet',
rpc_url: "Annotated[str, typer.Option('--rpc-url', envvar='BITCOIN_RPC_URL')]" = 'http://127.0.0.1:8332',
rpc_user: "Annotated[str, typer.Option('--rpc-user', envvar='BITCOIN_RPC_USER')]" = '',
rpc_password: "Annotated[str, typer.Option('--rpc-password', envvar='BITCOIN_RPC_PASSWORD')]" = '',
broadcast: "Annotated[bool, typer.Option('--broadcast', help='Broadcast the transaction')]" = True,
yes: "Annotated[bool, typer.Option('--yes', '-y', help='Skip confirmation prompt')]" = False,
log_level: "Annotated[str, typer.Option('--log-level', '-l')]" = 'INFO') ‑> None-
Expand source code
@app.command() def send( destination: Annotated[str, typer.Argument(help="Destination address")], amount: Annotated[int, typer.Option("--amount", "-a", help="Amount in sats (0 for sweep)")] = 0, mnemonic: Annotated[str | None, typer.Option("--mnemonic")] = None, mnemonic_file: Annotated[Path | None, typer.Option("--mnemonic-file", "-f")] = None, password: Annotated[str | None, typer.Option("--password", "-p")] = None, mixdepth: Annotated[int, typer.Option("--mixdepth", "-m", help="Source mixdepth")] = 0, fee_rate: Annotated[int, typer.Option("--fee-rate", help="Fee rate in sat/vB")] = 10, network: Annotated[str, typer.Option("--network", "-n")] = "mainnet", rpc_url: Annotated[ str, typer.Option("--rpc-url", envvar="BITCOIN_RPC_URL") ] = "http://127.0.0.1:8332", rpc_user: Annotated[str, typer.Option("--rpc-user", envvar="BITCOIN_RPC_USER")] = "", rpc_password: Annotated[ str, typer.Option("--rpc-password", envvar="BITCOIN_RPC_PASSWORD") ] = "", broadcast: Annotated[ bool, typer.Option("--broadcast", help="Broadcast the transaction") ] = True, yes: Annotated[bool, typer.Option("--yes", "-y", help="Skip confirmation prompt")] = False, log_level: Annotated[str, typer.Option("--log-level", "-l")] = "INFO", ) -> None: """Send a simple transaction from wallet to an address.""" setup_logging(log_level) try: resolved_mnemonic = _resolve_mnemonic(mnemonic, mnemonic_file, password, True) except (FileNotFoundError, ValueError) as e: logger.error(str(e)) raise typer.Exit(1) asyncio.run( _send_transaction( resolved_mnemonic, destination, amount, mixdepth, fee_rate, network, rpc_url, rpc_user, rpc_password, broadcast, yes, ) )Send a simple transaction from wallet to an address.
def setup_logging(level: str = 'INFO') ‑> None-
Expand source code
def setup_logging(level: str = "INFO") -> None: """Configure loguru logging.""" logger.remove() logger.add( sys.stderr, level=level.upper(), format="<green>{time:HH:mm:ss}</green> | <level>{level: <8}</level> | {message}", )Configure loguru logging.
def validate(mnemonic_arg: "Annotated[str | None, typer.Argument(help='Mnemonic to validate()')]" = None,
mnemonic_file: "Annotated[Path | None, typer.Option('--mnemonic-file', '-f', help='Path to mnemonic file')]" = None,
password: "Annotated[str | None, typer.Option('--password', '-p')]" = None) ‑> None-
Expand source code
@app.command() def validate( mnemonic_arg: Annotated[str | None, typer.Argument(help="Mnemonic to validate")] = None, mnemonic_file: Annotated[ Path | None, typer.Option("--mnemonic-file", "-f", help="Path to mnemonic file") ] = None, password: Annotated[str | None, typer.Option("--password", "-p")] = None, ) -> None: """Validate a BIP39 mnemonic phrase.""" mnemonic: str = "" if mnemonic_file: try: mnemonic = load_mnemonic_file(mnemonic_file, password) except (FileNotFoundError, ValueError) as e: print(f"Error: {e}") raise typer.Exit(1) elif mnemonic_arg: mnemonic = mnemonic_arg else: mnemonic = typer.prompt("Enter mnemonic to validate") if validate_mnemonic(mnemonic): print("Mnemonic is VALID") word_count = len(mnemonic.strip().split()) print(f"Word count: {word_count}") else: print("Mnemonic is INVALID") raise typer.Exit(1)Validate a BIP39 mnemonic phrase.
def validate_mnemonic(mnemonic: str) ‑> bool-
Expand source code
def validate_mnemonic(mnemonic: str) -> bool: """ Validate a BIP39 mnemonic phrase. Args: mnemonic: The mnemonic phrase to validate Returns: True if valid, False otherwise """ from mnemonic import Mnemonic m = Mnemonic("english") return m.check(mnemonic)Validate a BIP39 mnemonic phrase.
Args
mnemonic- The mnemonic phrase to validate
Returns
True if valid, False otherwise