Module jmcore.bitcoin
Bitcoin utilities for JoinMarket.
This module provides consolidated Bitcoin operations: - Address encoding/decoding (bech32, base58) - Hash functions (hash160, hash256) - Transaction parsing/serialization - Varint encoding/decoding
Uses external libraries for security-critical operations: - bech32: BIP173 bech32 encoding - base58: Base58Check encoding
Functions
def address_to_scriptpubkey(address: str) ‑> bytes-
Expand source code
def address_to_scriptpubkey(address: str) -> bytes: """ Convert Bitcoin address to scriptPubKey. Supports: - P2WPKH (bc1q..., tb1q..., bcrt1q...) - P2WSH (bc1q... 62 chars) - P2TR (bc1p... taproot) - P2PKH (1..., m..., n...) - P2SH (3..., 2...) Args: address: Bitcoin address string Returns: scriptPubKey bytes """ # Bech32 (SegWit) addresses if address.startswith(("bc1", "tb1", "bcrt1")): hrp_end = 4 if address.startswith("bcrt") else 2 hrp = address[:hrp_end] bech32_decoded = bech32_lib.decode(hrp, address) if bech32_decoded[0] is None or bech32_decoded[1] is None: raise ValueError(f"Invalid bech32 address: {address}") witver = bech32_decoded[0] witprog = bytes(bech32_decoded[1]) if witver == 0: if len(witprog) == 20: # P2WPKH: OP_0 <20-byte-pubkeyhash> return bytes([0x00, 0x14]) + witprog elif len(witprog) == 32: # P2WSH: OP_0 <32-byte-scripthash> return bytes([0x00, 0x20]) + witprog elif witver == 1 and len(witprog) == 32: # P2TR: OP_1 <32-byte-pubkey> return bytes([0x51, 0x20]) + witprog raise ValueError(f"Unsupported witness version: {witver}") # Base58 addresses (legacy) decoded = base58.b58decode_check(address) version = decoded[0] payload = decoded[1:] if version in (0x00, 0x6F): # Mainnet/Testnet P2PKH # P2PKH: OP_DUP OP_HASH160 <20-byte-pubkeyhash> OP_EQUALVERIFY OP_CHECKSIG return bytes([0x76, 0xA9, 0x14]) + payload + bytes([0x88, 0xAC]) elif version in (0x05, 0xC4): # Mainnet/Testnet P2SH # P2SH: OP_HASH160 <20-byte-scripthash> OP_EQUAL return bytes([0xA9, 0x14]) + payload + bytes([0x87]) raise ValueError(f"Unknown address version: {version}")Convert Bitcoin address to scriptPubKey.
Supports: - P2WPKH (bc1q…, tb1q…, bcrt1q…) - P2WSH (bc1q… 62 chars) - P2TR (bc1p… taproot) - P2PKH (1…, m…, n…) - P2SH (3…, 2…)
Args
address- Bitcoin address string
Returns
scriptPubKey bytes
def btc_to_sats(btc: float) ‑> int-
Expand source code
def btc_to_sats(btc: float) -> int: """ Convert BTC to satoshis safely. Uses round() instead of int() to avoid floating point precision errors that can truncate values (e.g. 0.0003 * 1e8 = 29999.999...). Args: btc: Amount in BTC Returns: Amount in satoshis """ return round(btc * SATS_PER_BTC)Convert BTC to satoshis safely.
Uses round() instead of int() to avoid floating point precision errors that can truncate values (e.g. 0.0003 * 1e8 = 29999.999…).
Args
btc- Amount in BTC
Returns
Amount in satoshis
def calculate_relative_fee(amount_sats: int, fee_rate: str) ‑> int-
Expand source code
def calculate_relative_fee(amount_sats: int, fee_rate: str) -> int: """ Calculate relative fee in satoshis from a fee rate string. Uses Decimal arithmetic with banker's rounding (ROUND_HALF_EVEN) to match the reference JoinMarket implementation. This is critical for sweep mode where the maker expects the exact same fee calculation. Args: amount_sats: Amount in satoshis fee_rate: Fee rate as decimal string (e.g., "0.001" = 0.1%) Returns: Fee in satoshis (rounded to nearest integer) Examples: >>> calculate_relative_fee(100_000_000, "0.001") 100000 # 0.1% of 1 BTC >>> calculate_relative_fee(50_000_000, "0.002") 100000 # 0.2% of 0.5 BTC >>> calculate_relative_fee(9994243, "0.000022") 220 # matches reference implementation's Decimal rounding """ from decimal import Decimal validate_satoshi_amount(amount_sats) # Handle integer strings like "0" or "1" if "." not in fee_rate: try: val = int(fee_rate) return int(amount_sats * val) except ValueError as e: raise ValueError(f"Fee rate must be decimal string or integer, got {fee_rate}") from e # Use Decimal for exact arithmetic, matching reference implementation # Reference uses: int((Decimal(cjfee) * Decimal(cj_amount)).quantize(Decimal(1))) # quantize(Decimal(1)) uses ROUND_HALF_EVEN (banker's rounding) by default return int((Decimal(fee_rate) * Decimal(amount_sats)).quantize(Decimal(1)))Calculate relative fee in satoshis from a fee rate string.
Uses Decimal arithmetic with banker's rounding (ROUND_HALF_EVEN) to match the reference JoinMarket implementation. This is critical for sweep mode where the maker expects the exact same fee calculation.
Args
amount_sats- Amount in satoshis
fee_rate- Fee rate as decimal string (e.g., "0.001" = 0.1%)
Returns
Fee in satoshis (rounded to nearest integer)
Examples
>>> calculate_relative_fee(100_000_000, "0.001") 100000 # 0.1% of 1 BTC >>> calculate_relative_fee(50_000_000, "0.002") 100000 # 0.2% of 0.5 BTC >>> calculate_relative_fee(9994243, "0.000022") 220 # matches reference implementation's Decimal rounding def calculate_sweep_amount(available_sats: int, relative_fees: list[str]) ‑> int-
Expand source code
def calculate_sweep_amount(available_sats: int, relative_fees: list[str]) -> int: """ Calculate CoinJoin amount for a sweep (no change output). The taker must pay maker fees from the swept amount: available = cj_amount + fees fees = sum(fee_rate * cj_amount for each maker) Solving for cj_amount: available = cj_amount * (1 + sum(fee_rates)) cj_amount = available / (1 + sum(fee_rates)) Args: available_sats: Total available balance in satoshis relative_fees: List of relative fee strings (e.g., ["0.001", "0.002"]) Returns: CoinJoin amount in satoshis (maximum amount after paying all fees) """ validate_satoshi_amount(available_sats) if not relative_fees: return available_sats # Parse all fee rates as fractions with common denominator # Example: ["0.001", "0.0015"] -> numerators=[1, 15], denominator=10000 try: max_decimals = 0 for fee in relative_fees: if "." in fee: max_decimals = max(max_decimals, len(fee.split(".")[1])) except IndexError as e: raise ValueError(f"Invalid fee format in {relative_fees}") from e denominator = 10**max_decimals sum_numerators = 0 for fee_rate in relative_fees: if "." in fee_rate: parts = fee_rate.split(".") # Normalize to common denominator # "0.001" with max_decimals=4 -> 10 (because 0.001 = 10/10000) numerator = int(parts[0] + parts[1]) * (10 ** (max_decimals - len(parts[1]))) sum_numerators += numerator else: # Handle integer fee rates (unlikely for relative fees but good for robustness) numerator = int(fee_rate) * denominator sum_numerators += numerator # cj_amount = available / (1 + sum_rel_fees) # = available / ((denominator + sum_numerators) / denominator) # = (available * denominator) / (denominator + sum_numerators) return (available_sats * denominator) // (denominator + sum_numerators)Calculate CoinJoin amount for a sweep (no change output).
The taker must pay maker fees from the swept amount: available = cj_amount + fees fees = sum(fee_rate * cj_amount for each maker)
Solving for cj_amount: available = cj_amount * (1 + sum(fee_rates)) cj_amount = available / (1 + sum(fee_rates))
Args
available_sats- Total available balance in satoshis
relative_fees- List of relative fee strings (e.g., ["0.001", "0.002"])
Returns
CoinJoin amount in satoshis (maximum amount after paying all fees)
def calculate_tx_vsize(tx_bytes: bytes) ‑> int-
Expand source code
def calculate_tx_vsize(tx_bytes: bytes) -> int: """ Calculate actual virtual size (vbytes) from a signed transaction. For SegWit transactions: vsize = ceil((3 * non_witness_size + total_size) / 4) For legacy transactions: vsize = total_size Args: tx_bytes: Serialized transaction bytes Returns: Virtual size in vbytes """ total_size = len(tx_bytes) # Check if this is a SegWit transaction (has marker 0x00 and flag 0x01 after version) if len(tx_bytes) > 6 and tx_bytes[4] == 0x00 and tx_bytes[5] == 0x01: # SegWit transaction - need to calculate non-witness size # Parse to find witness data boundaries offset = 4 # Skip version # Skip marker and flag offset += 2 # Read input count input_count, offset = decode_varint(tx_bytes, offset) # Skip inputs (each has: 32 txid + 4 vout + varint script_len + script + 4 sequence) for _ in range(input_count): offset += 32 + 4 # txid + vout script_len, offset = decode_varint(tx_bytes, offset) offset += script_len + 4 # script + sequence # Read output count output_count, offset = decode_varint(tx_bytes, offset) # Skip outputs (each has: 8 value + varint script_len + script) for _ in range(output_count): offset += 8 # value script_len, offset = decode_varint(tx_bytes, offset) offset += script_len # Now offset points to the start of witness data witness_start = offset # Skip witness data (one stack per input) for _ in range(input_count): stack_count, offset = decode_varint(tx_bytes, offset) for _ in range(stack_count): item_len, offset = decode_varint(tx_bytes, offset) offset += item_len # After witness comes locktime (4 bytes) witness_end = offset # Non-witness size = total - witness_data - marker(1) - flag(1) witness_size = witness_end - witness_start non_witness_size = total_size - witness_size - 2 # -2 for marker and flag # Weight = non_witness_size * 4 + witness_size (witness counts as 1 weight unit per byte) # But we also need to add marker+flag to witness weight (they're part of witness) weight = non_witness_size * 4 + witness_size + 2 # +2 for marker/flag at 1 wu each # vsize = ceil(weight / 4) return (weight + 3) // 4 else: # Legacy transaction - vsize equals byte size return total_sizeCalculate actual virtual size (vbytes) from a signed transaction.
For SegWit transactions: vsize = ceil((3 * non_witness_size + total_size) / 4) For legacy transactions: vsize = total_size
Args
tx_bytes- Serialized transaction bytes
Returns
Virtual size in vbytes
def create_p2wpkh_script_code(pubkey: bytes | str) ‑> bytes-
Expand source code
def create_p2wpkh_script_code(pubkey: bytes | str) -> bytes: """ Create scriptCode for P2WPKH signing (BIP143). For P2WPKH, the scriptCode is the P2PKH script: OP_DUP OP_HASH160 <20-byte-pubkeyhash> OP_EQUALVERIFY OP_CHECKSIG Args: pubkey: Public key bytes or hex Returns: 25-byte scriptCode """ if isinstance(pubkey, str): pubkey = bytes.fromhex(pubkey) pubkey_hash = hash160(pubkey) # OP_DUP OP_HASH160 PUSH20 <pkh> OP_EQUALVERIFY OP_CHECKSIG return b"\x76\xa9\x14" + pubkey_hash + b"\x88\xac"Create scriptCode for P2WPKH signing (BIP143).
For P2WPKH, the scriptCode is the P2PKH script: OP_DUP OP_HASH160 <20-byte-pubkeyhash> OP_EQUALVERIFY OP_CHECKSIG
Args
pubkey- Public key bytes or hex
Returns
25-byte scriptCode
def decode_varint(data: bytes, offset: int = 0) ‑> tuple[int, int]-
Expand source code
def decode_varint(data: bytes, offset: int = 0) -> tuple[int, int]: """ Decode Bitcoin varint from bytes. Args: data: Input bytes offset: Starting offset in data Returns: (value, new_offset) tuple """ first = data[offset] if first < 0xFD: return first, offset + 1 elif first == 0xFD: return struct.unpack("<H", data[offset + 1 : offset + 3])[0], offset + 3 elif first == 0xFE: return struct.unpack("<I", data[offset + 1 : offset + 5])[0], offset + 5 else: return struct.unpack("<Q", data[offset + 1 : offset + 9])[0], offset + 9Decode Bitcoin varint from bytes.
Args
data- Input bytes
offset- Starting offset in data
Returns
(value, new_offset) tuple
def encode_varint(n: int) ‑> bytes-
Expand source code
def encode_varint(n: int) -> bytes: """ Encode integer as Bitcoin varint. Args: n: Integer to encode Returns: Encoded bytes """ if n < 0xFD: return bytes([n]) elif n <= 0xFFFF: return bytes([0xFD]) + struct.pack("<H", n) elif n <= 0xFFFFFFFF: return bytes([0xFE]) + struct.pack("<I", n) else: return bytes([0xFF]) + struct.pack("<Q", n)Encode integer as Bitcoin varint.
Args
n- Integer to encode
Returns
Encoded bytes
def estimate_vsize(input_types: list[str], output_types: list[str]) ‑> int-
Expand source code
def estimate_vsize(input_types: list[str], output_types: list[str]) -> int: """ Estimate transaction virtual size (vbytes). Based on JoinMarket reference implementation logic. Args: input_types: List of input types (e.g. ["p2wpkh", "p2wsh"]) output_types: List of output types (e.g. ["p2wpkh", "p2wsh"]) Returns: Estimated vsize in bytes """ # Sizes in weight units (wu) = 4 * vbytes # Base transaction overhead: version(4) + locktime(4) + input_count(1) + output_count(1) # SegWit marker(1) + flag(1) # Total base: 10 bytes -> 40 wu # We assume varints for counts are 1 byte (up to 252 inputs/outputs) base_weight = 40 + 2 # +2 for marker/flag weight (witness data) # Input sizes (weight units) # P2WPKH: # Non-witness: 32(txid) + 4(vout) + 1(script_len) + 4(seq) = 41 bytes -> 164 wu # Witness: 1(stack_len) + 1(sig_len) + 72(sig) + 1(pub_len) + 33(pub) = 108 wu # Total: 272 wu (68 vbytes) # P2WSH (fidelity bond): # Non-witness: 41 bytes -> 164 wu # Witness: 1(stack_len) + 1(sig_len) + 72(sig) + 1(script_len) + 43(script) = 118 wu # Total: 282 wu (70.5 vbytes) - Ref impl uses slightly different calc, let's stick to calculated input_weights = { "p2wpkh": 41 * 4 + 108, "p2wsh": 41 * 4 + 118, # Using 72 byte sig + 43 byte script (fidelity bond) } # Output sizes (weight units) # P2WPKH: 8(val) + 1(len) + 22(script) = 31 bytes -> 124 wu # P2WSH: 8(val) + 1(len) + 34(script) = 43 bytes -> 172 wu # P2TR: 8(val) + 1(len) + 34(script) = 43 bytes -> 172 wu output_weights = { "p2wpkh": 31 * 4, "p2wsh": 43 * 4, "p2tr": 43 * 4, "p2pkh": 34 * 4, "p2sh": 32 * 4, } weight = base_weight for inp in input_types: weight += input_weights.get(inp, 272) # Default to P2WPKH if unknown for out in output_types: weight += output_weights.get(out, 124) # Default to P2WPKH # vsize = ceil(weight / 4) return (weight + 3) // 4Estimate transaction virtual size (vbytes).
Based on JoinMarket reference implementation logic.
Args
input_types- List of input types (e.g. ["p2wpkh", "p2wsh"])
output_types- List of output types (e.g. ["p2wpkh", "p2wsh"])
Returns
Estimated vsize in bytes
def format_amount(sats: int, include_unit: bool = True) ‑> str-
Expand source code
def format_amount(sats: int, include_unit: bool = True) -> str: """ Format satoshi amount as string. Default: '1,000,000 sats (0.01000000 BTC)' Args: sats: Amount in satoshis include_unit: Whether to include units and BTC conversion Returns: Formatted string """ if include_unit: btc_val = sats_to_btc(sats) return f"{sats:,} sats ({btc_val:.8f} BTC)" return f"{sats:,}"Format satoshi amount as string. Default: '1,000,000 sats (0.01000000 BTC)'
Args
sats- Amount in satoshis
include_unit- Whether to include units and BTC conversion
Returns
Formatted string
def get_address_type(address: str) ‑> str-
Expand source code
def get_address_type(address: str) -> str: """ Determine address type from string. Args: address: Bitcoin address Returns: Address type: "p2wpkh", "p2wsh", "p2tr", "p2pkh", "p2sh" Raises: ValueError: If address is invalid or unknown type """ # Bech32 (SegWit) if address.startswith(("bc1", "tb1", "bcrt1")): hrp_end = 4 if address.startswith("bcrt") else 2 hrp = address[:hrp_end] decoded = bech32_lib.decode(hrp, address) if decoded[0] is None or decoded[1] is None: raise ValueError(f"Invalid bech32 address: {address}") witver = decoded[0] witprog = bytes(decoded[1]) if witver == 0: if len(witprog) == 20: return "p2wpkh" elif len(witprog) == 32: return "p2wsh" elif witver == 1 and len(witprog) == 32: return "p2tr" raise ValueError(f"Unknown SegWit address type: version={witver}, len={len(witprog)}") # Base58 try: decoded = base58.b58decode_check(address) version = decoded[0] if version in (0x00, 0x6F): # P2PKH return "p2pkh" elif version in (0x05, 0xC4): # P2SH return "p2sh" except Exception: pass raise ValueError(f"Unknown address type: {address}")Determine address type from string.
Args
address- Bitcoin address
Returns
Address type- "p2wpkh", "p2wsh", "p2tr", "p2pkh", "p2sh"
Raises
ValueError- If address is invalid or unknown type
def get_hrp(network: str | NetworkType) ‑> str-
Expand source code
def get_hrp(network: str | NetworkType) -> str: """ Get bech32 human-readable part for network. Args: network: Network type (string or enum) Returns: HRP string (bc, tb, bcrt) """ if isinstance(network, str): network = NetworkType(network) return HRP_MAP[network]Get bech32 human-readable part for network.
Args
network- Network type (string or enum)
Returns
HRP string (bc, tb, bcrt)
def get_txid(tx_hex: str) ‑> str-
Expand source code
def get_txid(tx_hex: str) -> str: """ Calculate transaction ID (double SHA256 of non-witness data). Args: tx_hex: Transaction hex Returns: Transaction ID as hex string """ parsed = parse_transaction(tx_hex) # Serialize without witness for txid calculation data = serialize_transaction( version=parsed.version, inputs=parsed.inputs, outputs=parsed.outputs, locktime=parsed.locktime, witnesses=None, # No witnesses for txid ) return hash256(data)[::-1].hex()Calculate transaction ID (double SHA256 of non-witness data).
Args
tx_hex- Transaction hex
Returns
Transaction ID as hex string
def hash160(data: bytes) ‑> bytes-
Expand source code
def hash160(data: bytes) -> bytes: """ RIPEMD160(SHA256(data)) - Used for Bitcoin addresses. Args: data: Input data to hash Returns: 20-byte hash """ return hashlib.new("ripemd160", hashlib.sha256(data).digest()).digest()RIPEMD160(SHA256(data)) - Used for Bitcoin addresses.
Args
data- Input data to hash
Returns
20-byte hash
def hash256(data: bytes) ‑> bytes-
Expand source code
def hash256(data: bytes) -> bytes: """ SHA256(SHA256(data)) - Used for Bitcoin txids and block hashes. Args: data: Input data to hash Returns: 32-byte hash """ return hashlib.sha256(hashlib.sha256(data).digest()).digest()SHA256(SHA256(data)) - Used for Bitcoin txids and block hashes.
Args
data- Input data to hash
Returns
32-byte hash
def parse_transaction(tx_hex: str) ‑> ParsedTransaction-
Expand source code
def parse_transaction(tx_hex: str) -> ParsedTransaction: """ Parse a Bitcoin transaction from hex. Handles both SegWit and non-SegWit formats. Args: tx_hex: Transaction hex string Returns: ParsedTransaction object """ tx_bytes = bytes.fromhex(tx_hex) offset = 0 # Version version = struct.unpack("<I", tx_bytes[offset : offset + 4])[0] offset += 4 # Check for SegWit marker marker = tx_bytes[offset] flag = tx_bytes[offset + 1] has_witness = marker == 0x00 and flag == 0x01 if has_witness: offset += 2 # Inputs input_count, offset = decode_varint(tx_bytes, offset) inputs = [] for _ in range(input_count): txid = tx_bytes[offset : offset + 32][::-1].hex() offset += 32 vout = struct.unpack("<I", tx_bytes[offset : offset + 4])[0] offset += 4 script_len, offset = decode_varint(tx_bytes, offset) scriptsig = tx_bytes[offset : offset + script_len].hex() offset += script_len sequence = struct.unpack("<I", tx_bytes[offset : offset + 4])[0] offset += 4 inputs.append({"txid": txid, "vout": vout, "scriptsig": scriptsig, "sequence": sequence}) # Outputs output_count, offset = decode_varint(tx_bytes, offset) outputs = [] for _ in range(output_count): value = struct.unpack("<Q", tx_bytes[offset : offset + 8])[0] offset += 8 script_len, offset = decode_varint(tx_bytes, offset) scriptpubkey = tx_bytes[offset : offset + script_len].hex() offset += script_len outputs.append({"value": value, "scriptpubkey": scriptpubkey}) # Witnesses witnesses: list[list[bytes]] = [] if has_witness: for _ in range(input_count): wit_count, offset = decode_varint(tx_bytes, offset) wit_items = [] for _ in range(wit_count): item_len, offset = decode_varint(tx_bytes, offset) wit_items.append(tx_bytes[offset : offset + item_len]) offset += item_len witnesses.append(wit_items) # Locktime locktime = struct.unpack("<I", tx_bytes[offset : offset + 4])[0] return ParsedTransaction( version=version, inputs=inputs, outputs=outputs, witnesses=witnesses, locktime=locktime, has_witness=has_witness, )Parse a Bitcoin transaction from hex.
Handles both SegWit and non-SegWit formats.
Args
tx_hex- Transaction hex string
Returns
ParsedTransaction object
def pubkey_to_p2wpkh_address(pubkey: bytes | str,
network: str | NetworkType = 'mainnet') ‑> str-
Expand source code
@validate_call def pubkey_to_p2wpkh_address(pubkey: bytes | str, network: str | NetworkType = "mainnet") -> str: """ Convert compressed public key to P2WPKH (native SegWit) address. Args: pubkey: 33-byte compressed public key (bytes or hex string) network: Network type Returns: Bech32 P2WPKH address """ if isinstance(pubkey, str): pubkey = bytes.fromhex(pubkey) if len(pubkey) != 33: raise ValueError(f"Invalid compressed pubkey length: {len(pubkey)}") pubkey_hash = hash160(pubkey) hrp = get_hrp(network) result = bech32_lib.encode(hrp, 0, pubkey_hash) if result is None: raise ValueError("Failed to encode bech32 address") return resultConvert compressed public key to P2WPKH (native SegWit) address.
Args
pubkey- 33-byte compressed public key (bytes or hex string)
network- Network type
Returns
Bech32 P2WPKH address
def pubkey_to_p2wpkh_script(pubkey: bytes | str) ‑> bytes-
Expand source code
def pubkey_to_p2wpkh_script(pubkey: bytes | str) -> bytes: """ Create P2WPKH scriptPubKey from public key. Args: pubkey: 33-byte compressed public key (bytes or hex string) Returns: 22-byte P2WPKH scriptPubKey (OP_0 <20-byte-hash>) """ if isinstance(pubkey, str): pubkey = bytes.fromhex(pubkey) pubkey_hash = hash160(pubkey) return bytes([0x00, 0x14]) + pubkey_hashCreate P2WPKH scriptPubKey from public key.
Args
pubkey- 33-byte compressed public key (bytes or hex string)
Returns
22-byte P2WPKH scriptPubKey (OP_0 <20-byte-hash>)
def sats_to_btc(sats: int) ‑> float-
Expand source code
def sats_to_btc(sats: int) -> float: """ Convert satoshis to BTC. Only use for display/output. Args: sats: Amount in satoshis Returns: Amount in BTC """ return sats / SATS_PER_BTCConvert satoshis to BTC. Only use for display/output.
Args
sats- Amount in satoshis
Returns
Amount in BTC
def script_to_p2wsh_address(script: bytes,
network: str | NetworkType = 'mainnet') ‑> str-
Expand source code
@validate_call def script_to_p2wsh_address(script: bytes, network: str | NetworkType = "mainnet") -> str: """ Convert witness script to P2WSH address. Args: script: Witness script bytes network: Network type Returns: Bech32 P2WSH address """ script_hash = sha256(script) hrp = get_hrp(network) result = bech32_lib.encode(hrp, 0, script_hash) if result is None: raise ValueError("Failed to encode bech32 address") return resultConvert witness script to P2WSH address.
Args
script- Witness script bytes
network- Network type
Returns
Bech32 P2WSH address
def script_to_p2wsh_scriptpubkey(script: bytes) ‑> bytes-
Expand source code
def script_to_p2wsh_scriptpubkey(script: bytes) -> bytes: """ Create P2WSH scriptPubKey from witness script. Args: script: Witness script bytes Returns: 34-byte P2WSH scriptPubKey (OP_0 <32-byte-hash>) """ script_hash = sha256(script) return bytes([0x00, 0x20]) + script_hashCreate P2WSH scriptPubKey from witness script.
Args
script- Witness script bytes
Returns
34-byte P2WSH scriptPubKey (OP_0 <32-byte-hash>)
def scriptpubkey_to_address(scriptpubkey: bytes,
network: str | NetworkType = 'mainnet') ‑> str-
Expand source code
@validate_call def scriptpubkey_to_address(scriptpubkey: bytes, network: str | NetworkType = "mainnet") -> str: """ Convert scriptPubKey to address. Supports P2WPKH, P2WSH, P2TR, P2PKH, P2SH. Args: scriptpubkey: scriptPubKey bytes network: Network type Returns: Bitcoin address string """ if isinstance(network, str): network = NetworkType(network) hrp = get_hrp(network) # P2WPKH if len(scriptpubkey) == 22 and scriptpubkey[0] == 0x00 and scriptpubkey[1] == 0x14: result = bech32_lib.encode(hrp, 0, scriptpubkey[2:]) if result is None: raise ValueError(f"Failed to encode P2WPKH address: {scriptpubkey.hex()}") return result # P2WSH if len(scriptpubkey) == 34 and scriptpubkey[0] == 0x00 and scriptpubkey[1] == 0x20: result = bech32_lib.encode(hrp, 0, scriptpubkey[2:]) if result is None: raise ValueError(f"Failed to encode P2WSH address: {scriptpubkey.hex()}") return result # P2TR if len(scriptpubkey) == 34 and scriptpubkey[0] == 0x51 and scriptpubkey[1] == 0x20: result = bech32_lib.encode(hrp, 1, scriptpubkey[2:]) if result is None: raise ValueError(f"Failed to encode P2TR address: {scriptpubkey.hex()}") return result # P2PKH if ( len(scriptpubkey) == 25 and scriptpubkey[0] == 0x76 and scriptpubkey[1] == 0xA9 and scriptpubkey[2] == 0x14 and scriptpubkey[23] == 0x88 and scriptpubkey[24] == 0xAC ): payload = bytes([P2PKH_VERSION[network]]) + scriptpubkey[3:23] return base58.b58encode_check(payload).decode("ascii") # P2SH if ( len(scriptpubkey) == 23 and scriptpubkey[0] == 0xA9 and scriptpubkey[1] == 0x14 and scriptpubkey[22] == 0x87 ): payload = bytes([P2SH_VERSION[network]]) + scriptpubkey[2:22] return base58.b58encode_check(payload).decode("ascii") raise ValueError(f"Unsupported scriptPubKey: {scriptpubkey.hex()}")Convert scriptPubKey to address.
Supports P2WPKH, P2WSH, P2TR, P2PKH, P2SH.
Args
scriptpubkey- scriptPubKey bytes
network- Network type
Returns
Bitcoin address string
def serialize_input(inp: TxInput,
include_scriptsig: bool = True) ‑> bytes-
Expand source code
def serialize_input(inp: TxInput, include_scriptsig: bool = True) -> bytes: """ Serialize a transaction input. Args: inp: Transaction input include_scriptsig: Whether to include scriptSig Returns: Serialized input bytes """ result = serialize_outpoint(inp.txid, inp.vout) if include_scriptsig and inp.scriptsig: scriptsig = bytes.fromhex(inp.scriptsig) result += encode_varint(len(scriptsig)) + scriptsig else: result += bytes([0x00]) # Empty scriptSig result += struct.pack("<I", inp.sequence) return resultSerialize a transaction input.
Args
inp- Transaction input
include_scriptsig- Whether to include scriptSig
Returns
Serialized input bytes
def serialize_outpoint(txid: str, vout: int) ‑> bytes-
Expand source code
def serialize_outpoint(txid: str, vout: int) -> bytes: """ Serialize outpoint (txid:vout). Args: txid: Transaction ID in RPC format (big-endian hex) vout: Output index Returns: 36-byte outpoint (little-endian txid + 4-byte vout) """ txid_bytes = bytes.fromhex(txid)[::-1] return txid_bytes + struct.pack("<I", vout)Serialize outpoint (txid:vout).
Args
txid- Transaction ID in RPC format (big-endian hex)
vout- Output index
Returns
36-byte outpoint (little-endian txid + 4-byte vout)
def serialize_output(out: TxOutput) ‑> bytes-
Expand source code
def serialize_output(out: TxOutput) -> bytes: """ Serialize a transaction output. Args: out: Transaction output Returns: Serialized output bytes """ result = struct.pack("<Q", out.value) scriptpubkey = ( bytes.fromhex(out.scriptpubkey) if out.scriptpubkey else address_to_scriptpubkey(out.address) ) result += encode_varint(len(scriptpubkey)) result += scriptpubkey return resultSerialize a transaction output.
Args
out- Transaction output
Returns
Serialized output bytes
def serialize_transaction(version: int,
inputs: list[dict[str, Any]],
outputs: list[dict[str, Any]],
locktime: int,
witnesses: list[list[bytes]] | None = None) ‑> bytes-
Expand source code
def serialize_transaction( version: int, inputs: list[dict[str, Any]], outputs: list[dict[str, Any]], locktime: int, witnesses: list[list[bytes]] | None = None, ) -> bytes: """ Serialize a Bitcoin transaction. Args: version: Transaction version inputs: List of input dicts outputs: List of output dicts locktime: Transaction locktime witnesses: Optional list of witness stacks Returns: Serialized transaction bytes """ has_witness = witnesses is not None and any(w for w in witnesses) result = struct.pack("<I", version) if has_witness: result += bytes([0x00, 0x01]) # SegWit marker and flag # Inputs result += encode_varint(len(inputs)) for inp in inputs: result += bytes.fromhex(inp["txid"])[::-1] result += struct.pack("<I", inp["vout"]) scriptsig = bytes.fromhex(inp.get("scriptsig", "")) result += encode_varint(len(scriptsig)) result += scriptsig result += struct.pack("<I", inp.get("sequence", 0xFFFFFFFF)) # Outputs result += encode_varint(len(outputs)) for out in outputs: result += struct.pack("<Q", out["value"]) scriptpubkey = bytes.fromhex(out["scriptpubkey"]) result += encode_varint(len(scriptpubkey)) result += scriptpubkey # Witnesses if has_witness and witnesses: for witness in witnesses: result += encode_varint(len(witness)) for item in witness: result += encode_varint(len(item)) result += item result += struct.pack("<I", locktime) return resultSerialize a Bitcoin transaction.
Args
version- Transaction version
inputs- List of input dicts
outputs- List of output dicts
locktime- Transaction locktime
witnesses- Optional list of witness stacks
Returns
Serialized transaction bytes
def sha256(data: bytes) ‑> bytes-
Expand source code
def sha256(data: bytes) -> bytes: """ Single SHA256 hash. Args: data: Input data to hash Returns: 32-byte hash """ return hashlib.sha256(data).digest()Single SHA256 hash.
Args
data- Input data to hash
Returns
32-byte hash
def validate_satoshi_amount(sats: int) ‑> None-
Expand source code
def validate_satoshi_amount(sats: int) -> None: """ Validate that amount is a non-negative integer. Args: sats: Amount to validate Raises: TypeError: If amount is not an integer ValueError: If amount is negative """ if not isinstance(sats, int): raise TypeError(f"Amount must be an integer (satoshis), got {type(sats)}") if sats < 0: raise ValueError(f"Amount cannot be negative, got {sats}")Validate that amount is a non-negative integer.
Args
sats- Amount to validate
Raises
TypeError- If amount is not an integer
ValueError- If amount is negative
Classes
class NetworkType (*values)-
Expand source code
class NetworkType(str, Enum): """Bitcoin network types.""" MAINNET = "mainnet" TESTNET = "testnet" SIGNET = "signet" REGTEST = "regtest"Bitcoin network types.
Ancestors
- builtins.str
- enum.Enum
Class variables
var MAINNET-
The type of the None singleton.
var REGTEST-
The type of the None singleton.
var SIGNET-
The type of the None singleton.
var TESTNET-
The type of the None singleton.
class ParsedTransaction (*args: Any, **kwargs: Any)-
Expand source code
@dataclass class ParsedTransaction: """Parsed Bitcoin transaction.""" version: int inputs: list[dict[str, Any]] outputs: list[dict[str, Any]] witnesses: list[list[bytes]] locktime: int has_witness: boolParsed Bitcoin transaction.
Instance variables
var has_witness : bool-
The type of the None singleton.
var inputs : list[dict[str, typing.Any]]-
The type of the None singleton.
var locktime : int-
The type of the None singleton.
var outputs : list[dict[str, typing.Any]]-
The type of the None singleton.
var version : int-
The type of the None singleton.
var witnesses : list[list[bytes]]-
The type of the None singleton.
class TxInput (*args: Any, **kwargs: Any)-
Expand source code
@dataclass class TxInput: """Transaction input.""" txid: str # In RPC format (big-endian hex) vout: int value: int = 0 scriptpubkey: str = "" scriptsig: str = "" sequence: int = 0xFFFFFFFFTransaction input.
Instance variables
var scriptpubkey : str-
The type of the None singleton.
var scriptsig : str-
The type of the None singleton.
var sequence : int-
The type of the None singleton.
var txid : str-
The type of the None singleton.
var value : int-
The type of the None singleton.
var vout : int-
The type of the None singleton.
class TxOutput (*args: Any, **kwargs: Any)-
Expand source code
@dataclass class TxOutput: """Transaction output.""" address: str value: int scriptpubkey: str = ""Transaction output.
Instance variables
var address : str-
The type of the None singleton.
var scriptpubkey : str-
The type of the None singleton.
var value : int-
The type of the None singleton.