Module maker.coinjoin
CoinJoin protocol handler for makers.
Manages the maker side of the CoinJoin protocol: 1. !fill - Taker requests to fill order 2. !pubkey - Maker sends commitment pubkey 3. !auth - Taker sends PoDLE proof (VERIFY!) 4. !ioauth - Maker sends selected UTXOs 5. !tx - Taker sends unsigned transaction (VERIFY!) 6. !sig - Maker sends signatures
Classes
class CoinJoinSession (taker_nick: str,
offer: Offer,
wallet: WalletService,
backend: BlockchainBackend,
min_confirmations: int = 1,
taker_utxo_retries: int = 10,
taker_utxo_age: int = 5,
taker_utxo_amtpercent: int = 20,
session_timeout_sec: int = 300,
merge_algorithm: str = 'default')-
Expand source code
class CoinJoinSession: """ Manages a single CoinJoin session with a taker. """ def __init__( self, taker_nick: str, offer: Offer, wallet: WalletService, backend: BlockchainBackend, min_confirmations: int = 1, taker_utxo_retries: int = 10, taker_utxo_age: int = 5, taker_utxo_amtpercent: int = 20, session_timeout_sec: int = 300, merge_algorithm: str = "default", ): self.taker_nick = taker_nick self.offer = offer self.wallet = wallet self.backend = backend self.min_confirmations = min_confirmations self.taker_utxo_retries = taker_utxo_retries self.taker_utxo_age = taker_utxo_age self.taker_utxo_amtpercent = taker_utxo_amtpercent self.merge_algorithm = merge_algorithm # UTXO selection strategy self.state = CoinJoinState.IDLE self.amount = 0 self.our_utxos: dict[tuple[str, int], UTXOInfo] = {} self.cj_address = "" self.change_address = "" self.mixdepth = 0 self.commitment = b"" self.taker_nacl_pk = "" # Taker's NaCl pubkey (hex) for btc_sig self.created_at = time.time() self.session_timeout_sec = session_timeout_sec # Feature detection for extended UTXO format (neutrino_compat) # Initially, we use extended format if our own backend requires it (neutrino) # This will be updated to True if taker sends extended format during !auth self.peer_neutrino_compat = backend.requires_neutrino_metadata() # E2E encryption session with taker self.crypto = CryptoSession() def is_timed_out(self) -> bool: """Check if the session has exceeded the timeout.""" return time.time() - self.created_at > self.session_timeout_sec async def handle_fill( self, amount: int, commitment: str, taker_pk: str ) -> tuple[bool, dict[str, Any]]: """ Handle !fill message from taker. Args: amount: CoinJoin amount requested commitment: PoDLE commitment (will be verified later in !auth) taker_pk: Taker's NaCl public key for E2E encryption Returns: (success, response_data) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.IDLE: return False, {"error": "Session not in IDLE state"} if amount < self.offer.minsize: return False, {"error": f"Amount too small: {amount} < {self.offer.minsize}"} if amount > self.offer.maxsize: return False, {"error": f"Amount too large: {amount} > {self.offer.maxsize}"} self.amount = amount self.commitment = bytes.fromhex(commitment) self.taker_nacl_pk = taker_pk # Store for btc_sig in handle_auth self.state = CoinJoinState.FILL_RECEIVED logger.info( f"Received !fill from {self.taker_nick}: " f"amount={amount}, commitment={commitment[:16]}..., taker_pk={taker_pk[:16]}..." ) # Set up E2E encryption with taker's NaCl pubkey try: self.crypto.setup_encryption(taker_pk) logger.debug(f"Set up encryption box with taker {self.taker_nick}") except Exception as e: logger.error(f"Failed to set up encryption with taker: {e}") return False, {"error": f"Invalid taker pubkey: {e}"} # Return our NaCl pubkey and features for E2E encryption setup # Format for !pubkey: <nacl_pubkey_hex> [features=<comma-separated>] # Features are optional - legacy peers won't send them nacl_pubkey = self.crypto.get_pubkey_hex() self.state = CoinJoinState.PUBKEY_SENT # Include features in the response # neutrino_compat: We support extended UTXO format (txid:vout:scriptpubkey:blockheight) # All modern makers can accept extended format (extra fields are simply ignored) features: list[str] = ["neutrino_compat"] return True, {"nacl_pubkey": nacl_pubkey, "features": features} except Exception as e: logger.error(f"Failed to handle !fill: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)} async def handle_auth( self, commitment: str, revelation: dict[str, Any], kphex: str ) -> tuple[bool, dict[str, Any]]: """ Handle !auth message from taker. CRITICAL SECURITY: Verifies PoDLE proof and taker's UTXO. Args: commitment: PoDLE commitment (should match from !fill) revelation: PoDLE revelation data kphex: Encryption key (hex) Returns: (success, response_data with UTXOs or error) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.PUBKEY_SENT: return False, {"error": "Session not in correct state for !auth"} commitment_bytes = bytes.fromhex(commitment) if commitment_bytes != self.commitment: return False, {"error": "Commitment mismatch"} parsed_rev = parse_podle_revelation(revelation) if not parsed_rev: return False, {"error": "Invalid PoDLE revelation format"} is_valid, error = verify_podle( parsed_rev["P"], parsed_rev["P2"], parsed_rev["sig"], parsed_rev["e"], commitment_bytes, index_range=range(self.taker_utxo_retries), ) if not is_valid: logger.warning(f"PoDLE verification failed: {error}") return False, {"error": f"PoDLE verification failed: {error}"} logger.info("PoDLE proof verified ✓") utxo_txid = parsed_rev["txid"] utxo_vout = parsed_rev["vout"] # Check for extended UTXO metadata (neutrino_compat feature) # The revelation may include scriptpubkey and blockheight taker_scriptpubkey = parsed_rev.get("scriptpubkey") taker_blockheight = parsed_rev.get("blockheight") # Track if taker sent extended format - we'll respond in kind taker_sent_extended = taker_scriptpubkey is not None and taker_blockheight is not None if taker_sent_extended: logger.debug("Taker sent extended UTXO format (neutrino_compat)") # Update our peer detection - taker supports neutrino_compat self.peer_neutrino_compat = True # Verify the taker's UTXO exists on the blockchain # Use Neutrino-compatible verification if backend requires it and metadata available if ( self.backend.requires_neutrino_metadata() and taker_scriptpubkey and taker_blockheight is not None ): # Neutrino backend: use metadata-based verification result = await self.backend.verify_utxo_with_metadata( txid=utxo_txid, vout=utxo_vout, scriptpubkey=taker_scriptpubkey, blockheight=taker_blockheight, ) if not result.valid: return False, {"error": f"Taker's UTXO verification failed: {result.error}"} taker_utxo_value = result.value taker_utxo_confirmations = result.confirmations logger.debug(f"Neutrino-verified taker's UTXO: {utxo_txid}:{utxo_vout}") else: # Full node: direct UTXO lookup taker_utxo = await self.backend.get_utxo(utxo_txid, utxo_vout) if not taker_utxo: return False, {"error": "Taker's UTXO not found on blockchain"} taker_utxo_value = taker_utxo.value taker_utxo_confirmations = taker_utxo.confirmations if taker_utxo_confirmations < self.taker_utxo_age: return False, { "error": f"Taker's UTXO too young: " f"{taker_utxo_confirmations} < {self.taker_utxo_age}" } required_amount = int(self.amount * self.taker_utxo_amtpercent / 100) if taker_utxo_value < required_amount: return False, { "error": f"Taker's UTXO too small: {taker_utxo_value} < {required_amount}" } logger.info("Taker's UTXO validated ✓") utxos_dict, cj_addr, change_addr, mixdepth = await self._select_our_utxos() if not utxos_dict: return False, {"error": "Failed to select UTXOs"} self.our_utxos = utxos_dict self.cj_address = cj_addr self.change_address = change_addr self.mixdepth = mixdepth # Format UTXOs: extended format (neutrino_compat) includes scriptpubkey:blockheight # Legacy format is just txid:vout utxo_metadata_list = [ UTXOMetadata( txid=txid, vout=vout, scriptpubkey=utxo_info.scriptpubkey, blockheight=utxo_info.height, ) for (txid, vout), utxo_info in utxos_dict.items() ] # Use extended format if peer supports neutrino_compat utxo_list_str = format_utxo_list(utxo_metadata_list, extended=self.peer_neutrino_compat) if self.peer_neutrino_compat: logger.debug("Using extended UTXO format for neutrino_compat peer") else: logger.debug("Using legacy UTXO format for legacy peer") # Get EC key for our first UTXO to sign taker's encryption key # This proves we own the UTXO we're contributing first_utxo_key, first_utxo_info = next(iter(utxos_dict.items())) auth_address = first_utxo_info.address auth_hd_key = self.wallet.get_key_for_address(auth_address) if auth_hd_key is None: return False, {"error": f"Could not get key for address {auth_address}"} # Get our EC pubkey (compressed) auth_pub_bytes = auth_hd_key.get_public_key_bytes() # Sign OUR OWN NaCl pubkey (hex string) with our EC key # This proves to the taker that we own the UTXO and links it to our encryption identity from jmcore.crypto import ecdsa_sign our_nacl_pk_hex = self.crypto.get_pubkey_hex() btc_sig = ecdsa_sign(our_nacl_pk_hex, auth_hd_key.get_private_key_bytes()) response = { "utxo_list": utxo_list_str, "auth_pub": auth_pub_bytes.hex(), "cj_addr": cj_addr, "change_addr": change_addr, "btc_sig": btc_sig, } self.state = CoinJoinState.IOAUTH_SENT logger.info(f"Sent !ioauth with {len(utxos_dict)} UTXOs") return True, response except Exception as e: logger.error(f"Failed to handle !auth: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)} async def handle_tx(self, tx_hex: str) -> tuple[bool, dict[str, Any]]: """ Handle !tx message from taker. CRITICAL SECURITY: Verifies unsigned transaction before signing! Args: tx_hex: Unsigned transaction hex Returns: (success, response_data with signatures or error) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.IOAUTH_SENT: return False, {"error": "Session not in correct state for !tx"} logger.info(f"Received !tx from {self.taker_nick}, verifying...") # Convert network string to NetworkType enum network = NetworkType(self.wallet.network) is_valid, error = verify_unsigned_transaction( tx_hex=tx_hex, our_utxos=self.our_utxos, cj_address=self.cj_address, change_address=self.change_address, amount=self.amount, cjfee=self.offer.cjfee, txfee=self.offer.txfee, offer_type=self.offer.ordertype, network=network, ) if not is_valid: logger.error(f"Transaction verification FAILED: {error}") self.state = CoinJoinState.FAILED return False, {"error": f"Transaction verification failed: {error}"} logger.info("Transaction verification PASSED ✓") signatures = await self._sign_transaction(tx_hex) # type: ignore[arg-type] if not signatures: return False, {"error": "Failed to sign transaction"} # Compute txid from the unsigned transaction for history tracking # The txid is computed from the non-witness data so we can calculate it now from jmcore.bitcoin import get_txid txid = get_txid(tx_hex) response = {"signatures": signatures, "txid": txid} self.state = CoinJoinState.SIG_SENT logger.info(f"Sent !sig with {len(signatures)} signatures (txid: {txid[:16]}...)") return True, response except Exception as e: logger.error(f"Failed to handle !tx: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)} async def _select_our_utxos( self, ) -> tuple[dict[tuple[str, int], UTXOInfo], str, str, int]: """ Select our UTXOs for the CoinJoin. Uses the configured merge_algorithm to determine UTXO selection: - default: Minimum UTXOs needed - gradual: +1 additional UTXO - greedy: ALL UTXOs from the mixdepth - random: +0 to +2 additional UTXOs Returns: (utxos_dict, cj_address, change_address, mixdepth) """ try: from jmcore.models import OfferType real_cjfee = 0 if self.offer.ordertype in (OfferType.SW0_ABSOLUTE, OfferType.SWA_ABSOLUTE): real_cjfee = int(self.offer.cjfee) else: from jmcore.bitcoin import calculate_relative_fee real_cjfee = calculate_relative_fee(self.amount, str(self.offer.cjfee)) total_amount = self.amount + self.offer.txfee required_amount = total_amount + 10000 - real_cjfee balances = {} for md in range(self.wallet.mixdepth_count): balance = await self.wallet.get_balance(md) balances[md] = balance eligible_mixdepths = {md: bal for md, bal in balances.items() if bal >= required_amount} if not eligible_mixdepths: logger.error(f"No mixdepth with sufficient balance: need {required_amount}") return {}, "", "", -1 max_mixdepth = max(eligible_mixdepths, key=lambda md: eligible_mixdepths[md]) # Use merge algorithm for UTXO selection # Makers can consolidate UTXOs "for free" since takers pay all fees selected = self.wallet.select_utxos_with_merge( max_mixdepth, required_amount, self.min_confirmations, merge_algorithm=self.merge_algorithm, ) utxos_dict = {(utxo.txid, utxo.vout): utxo for utxo in selected} cj_output_mixdepth = (max_mixdepth + 1) % self.wallet.mixdepth_count cj_index = self.wallet.get_next_address_index(cj_output_mixdepth, 1) cj_address = self.wallet.get_change_address(cj_output_mixdepth, cj_index) change_index = self.wallet.get_next_address_index(max_mixdepth, 1) change_address = self.wallet.get_change_address(max_mixdepth, change_index) logger.info( f"Selected {len(selected)} UTXOs from mixdepth {max_mixdepth} " f"(merge_algorithm={self.merge_algorithm}), " f"total value: {sum(u.value for u in selected)} sats" ) return utxos_dict, cj_address, change_address, max_mixdepth except Exception as e: logger.error(f"Failed to select UTXOs: {e}") return {}, "", "", -1 async def _sign_transaction(self, tx_hex: str) -> list[str]: """Sign our inputs in the transaction. Returns list of base64-encoded signatures in JM format. Each signature is: base64(varint(sig_len) + sig + varint(pub_len) + pub) This matches the CScript serialization format. """ import base64 try: tx_bytes = bytes.fromhex(tx_hex) tx = deserialize_transaction(tx_bytes) signatures: list[str] = [] # Build a map of (txid, vout) -> input index for the transaction # Note: txid in tx.inputs is little-endian bytes, need to convert input_index_map: dict[tuple[str, int], int] = {} for idx, tx_input in enumerate(tx.inputs): # Convert little-endian txid bytes to big-endian hex string (RPC format) txid_hex = tx_input.txid_le[::-1].hex() input_index_map[(txid_hex, tx_input.vout)] = idx for (txid, vout), utxo_info in self.our_utxos.items(): # Find the input index in the transaction utxo_key = (txid, vout) if utxo_key not in input_index_map: logger.error(f"Our UTXO {txid}:{vout} not found in transaction inputs") continue input_index = input_index_map[utxo_key] # Safety check: Fidelity bond (P2WSH) UTXOs should never be in CoinJoins if utxo_info.is_p2wsh: raise TransactionSigningError( f"Cannot sign P2WSH UTXO {txid}:{vout} in CoinJoin - " f"fidelity bond UTXOs cannot be used in CoinJoins" ) key = self.wallet.get_key_for_address(utxo_info.address) if not key: raise TransactionSigningError(f"Missing key for address {utxo_info.address}") priv_key = key.private_key pubkey_bytes = key.get_public_key_bytes(compressed=True) logger.debug( f"Signing UTXO {txid}:{vout} at input_index={input_index}, " f"value={utxo_info.value}, address={utxo_info.address}, " f"pubkey={pubkey_bytes.hex()[:16]}..." ) script_code = create_p2wpkh_script_code(pubkey_bytes) signature = sign_p2wpkh_input( tx=tx, input_index=input_index, script_code=script_code, value=utxo_info.value, private_key=priv_key, ) # Format as CScript: varint(sig_len) + sig + varint(pub_len) + pub # For lengths < 0x4c (76), varint is just the length byte sig_len = len(signature) pub_len = len(pubkey_bytes) # Build the sigmsg in JM format sigmsg = bytes([sig_len]) + signature + bytes([pub_len]) + pubkey_bytes # Base64 encode for transmission sig_b64 = base64.b64encode(sigmsg).decode("ascii") signatures.append(sig_b64) logger.debug(f"Signed input {input_index} for UTXO {txid}:{vout}") return signatures except TransactionSigningError as e: logger.error(f"Signing error: {e}") return [] except Exception as e: logger.error(f"Failed to sign transaction: {e}") return []Manages a single CoinJoin session with a taker.
Methods
async def handle_auth(self, commitment: str, revelation: dict[str, Any], kphex: str) ‑> tuple[bool, dict[str, typing.Any]]-
Expand source code
async def handle_auth( self, commitment: str, revelation: dict[str, Any], kphex: str ) -> tuple[bool, dict[str, Any]]: """ Handle !auth message from taker. CRITICAL SECURITY: Verifies PoDLE proof and taker's UTXO. Args: commitment: PoDLE commitment (should match from !fill) revelation: PoDLE revelation data kphex: Encryption key (hex) Returns: (success, response_data with UTXOs or error) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.PUBKEY_SENT: return False, {"error": "Session not in correct state for !auth"} commitment_bytes = bytes.fromhex(commitment) if commitment_bytes != self.commitment: return False, {"error": "Commitment mismatch"} parsed_rev = parse_podle_revelation(revelation) if not parsed_rev: return False, {"error": "Invalid PoDLE revelation format"} is_valid, error = verify_podle( parsed_rev["P"], parsed_rev["P2"], parsed_rev["sig"], parsed_rev["e"], commitment_bytes, index_range=range(self.taker_utxo_retries), ) if not is_valid: logger.warning(f"PoDLE verification failed: {error}") return False, {"error": f"PoDLE verification failed: {error}"} logger.info("PoDLE proof verified ✓") utxo_txid = parsed_rev["txid"] utxo_vout = parsed_rev["vout"] # Check for extended UTXO metadata (neutrino_compat feature) # The revelation may include scriptpubkey and blockheight taker_scriptpubkey = parsed_rev.get("scriptpubkey") taker_blockheight = parsed_rev.get("blockheight") # Track if taker sent extended format - we'll respond in kind taker_sent_extended = taker_scriptpubkey is not None and taker_blockheight is not None if taker_sent_extended: logger.debug("Taker sent extended UTXO format (neutrino_compat)") # Update our peer detection - taker supports neutrino_compat self.peer_neutrino_compat = True # Verify the taker's UTXO exists on the blockchain # Use Neutrino-compatible verification if backend requires it and metadata available if ( self.backend.requires_neutrino_metadata() and taker_scriptpubkey and taker_blockheight is not None ): # Neutrino backend: use metadata-based verification result = await self.backend.verify_utxo_with_metadata( txid=utxo_txid, vout=utxo_vout, scriptpubkey=taker_scriptpubkey, blockheight=taker_blockheight, ) if not result.valid: return False, {"error": f"Taker's UTXO verification failed: {result.error}"} taker_utxo_value = result.value taker_utxo_confirmations = result.confirmations logger.debug(f"Neutrino-verified taker's UTXO: {utxo_txid}:{utxo_vout}") else: # Full node: direct UTXO lookup taker_utxo = await self.backend.get_utxo(utxo_txid, utxo_vout) if not taker_utxo: return False, {"error": "Taker's UTXO not found on blockchain"} taker_utxo_value = taker_utxo.value taker_utxo_confirmations = taker_utxo.confirmations if taker_utxo_confirmations < self.taker_utxo_age: return False, { "error": f"Taker's UTXO too young: " f"{taker_utxo_confirmations} < {self.taker_utxo_age}" } required_amount = int(self.amount * self.taker_utxo_amtpercent / 100) if taker_utxo_value < required_amount: return False, { "error": f"Taker's UTXO too small: {taker_utxo_value} < {required_amount}" } logger.info("Taker's UTXO validated ✓") utxos_dict, cj_addr, change_addr, mixdepth = await self._select_our_utxos() if not utxos_dict: return False, {"error": "Failed to select UTXOs"} self.our_utxos = utxos_dict self.cj_address = cj_addr self.change_address = change_addr self.mixdepth = mixdepth # Format UTXOs: extended format (neutrino_compat) includes scriptpubkey:blockheight # Legacy format is just txid:vout utxo_metadata_list = [ UTXOMetadata( txid=txid, vout=vout, scriptpubkey=utxo_info.scriptpubkey, blockheight=utxo_info.height, ) for (txid, vout), utxo_info in utxos_dict.items() ] # Use extended format if peer supports neutrino_compat utxo_list_str = format_utxo_list(utxo_metadata_list, extended=self.peer_neutrino_compat) if self.peer_neutrino_compat: logger.debug("Using extended UTXO format for neutrino_compat peer") else: logger.debug("Using legacy UTXO format for legacy peer") # Get EC key for our first UTXO to sign taker's encryption key # This proves we own the UTXO we're contributing first_utxo_key, first_utxo_info = next(iter(utxos_dict.items())) auth_address = first_utxo_info.address auth_hd_key = self.wallet.get_key_for_address(auth_address) if auth_hd_key is None: return False, {"error": f"Could not get key for address {auth_address}"} # Get our EC pubkey (compressed) auth_pub_bytes = auth_hd_key.get_public_key_bytes() # Sign OUR OWN NaCl pubkey (hex string) with our EC key # This proves to the taker that we own the UTXO and links it to our encryption identity from jmcore.crypto import ecdsa_sign our_nacl_pk_hex = self.crypto.get_pubkey_hex() btc_sig = ecdsa_sign(our_nacl_pk_hex, auth_hd_key.get_private_key_bytes()) response = { "utxo_list": utxo_list_str, "auth_pub": auth_pub_bytes.hex(), "cj_addr": cj_addr, "change_addr": change_addr, "btc_sig": btc_sig, } self.state = CoinJoinState.IOAUTH_SENT logger.info(f"Sent !ioauth with {len(utxos_dict)} UTXOs") return True, response except Exception as e: logger.error(f"Failed to handle !auth: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)}Handle !auth message from taker.
CRITICAL SECURITY: Verifies PoDLE proof and taker's UTXO.
Args
commitment- PoDLE commitment (should match from !fill)
revelation- PoDLE revelation data
kphex- Encryption key (hex)
Returns
(success, response_data with UTXOs or error)
async def handle_fill(self, amount: int, commitment: str, taker_pk: str) ‑> tuple[bool, dict[str, typing.Any]]-
Expand source code
async def handle_fill( self, amount: int, commitment: str, taker_pk: str ) -> tuple[bool, dict[str, Any]]: """ Handle !fill message from taker. Args: amount: CoinJoin amount requested commitment: PoDLE commitment (will be verified later in !auth) taker_pk: Taker's NaCl public key for E2E encryption Returns: (success, response_data) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.IDLE: return False, {"error": "Session not in IDLE state"} if amount < self.offer.minsize: return False, {"error": f"Amount too small: {amount} < {self.offer.minsize}"} if amount > self.offer.maxsize: return False, {"error": f"Amount too large: {amount} > {self.offer.maxsize}"} self.amount = amount self.commitment = bytes.fromhex(commitment) self.taker_nacl_pk = taker_pk # Store for btc_sig in handle_auth self.state = CoinJoinState.FILL_RECEIVED logger.info( f"Received !fill from {self.taker_nick}: " f"amount={amount}, commitment={commitment[:16]}..., taker_pk={taker_pk[:16]}..." ) # Set up E2E encryption with taker's NaCl pubkey try: self.crypto.setup_encryption(taker_pk) logger.debug(f"Set up encryption box with taker {self.taker_nick}") except Exception as e: logger.error(f"Failed to set up encryption with taker: {e}") return False, {"error": f"Invalid taker pubkey: {e}"} # Return our NaCl pubkey and features for E2E encryption setup # Format for !pubkey: <nacl_pubkey_hex> [features=<comma-separated>] # Features are optional - legacy peers won't send them nacl_pubkey = self.crypto.get_pubkey_hex() self.state = CoinJoinState.PUBKEY_SENT # Include features in the response # neutrino_compat: We support extended UTXO format (txid:vout:scriptpubkey:blockheight) # All modern makers can accept extended format (extra fields are simply ignored) features: list[str] = ["neutrino_compat"] return True, {"nacl_pubkey": nacl_pubkey, "features": features} except Exception as e: logger.error(f"Failed to handle !fill: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)}Handle !fill message from taker.
Args
amount- CoinJoin amount requested
commitment- PoDLE commitment (will be verified later in !auth)
taker_pk- Taker's NaCl public key for E2E encryption
Returns
(success, response_data)
async def handle_tx(self, tx_hex: str) ‑> tuple[bool, dict[str, typing.Any]]-
Expand source code
async def handle_tx(self, tx_hex: str) -> tuple[bool, dict[str, Any]]: """ Handle !tx message from taker. CRITICAL SECURITY: Verifies unsigned transaction before signing! Args: tx_hex: Unsigned transaction hex Returns: (success, response_data with signatures or error) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.IOAUTH_SENT: return False, {"error": "Session not in correct state for !tx"} logger.info(f"Received !tx from {self.taker_nick}, verifying...") # Convert network string to NetworkType enum network = NetworkType(self.wallet.network) is_valid, error = verify_unsigned_transaction( tx_hex=tx_hex, our_utxos=self.our_utxos, cj_address=self.cj_address, change_address=self.change_address, amount=self.amount, cjfee=self.offer.cjfee, txfee=self.offer.txfee, offer_type=self.offer.ordertype, network=network, ) if not is_valid: logger.error(f"Transaction verification FAILED: {error}") self.state = CoinJoinState.FAILED return False, {"error": f"Transaction verification failed: {error}"} logger.info("Transaction verification PASSED ✓") signatures = await self._sign_transaction(tx_hex) # type: ignore[arg-type] if not signatures: return False, {"error": "Failed to sign transaction"} # Compute txid from the unsigned transaction for history tracking # The txid is computed from the non-witness data so we can calculate it now from jmcore.bitcoin import get_txid txid = get_txid(tx_hex) response = {"signatures": signatures, "txid": txid} self.state = CoinJoinState.SIG_SENT logger.info(f"Sent !sig with {len(signatures)} signatures (txid: {txid[:16]}...)") return True, response except Exception as e: logger.error(f"Failed to handle !tx: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)}Handle !tx message from taker.
CRITICAL SECURITY: Verifies unsigned transaction before signing!
Args
tx_hex- Unsigned transaction hex
Returns
(success, response_data with signatures or error)
def is_timed_out(self) ‑> bool-
Expand source code
def is_timed_out(self) -> bool: """Check if the session has exceeded the timeout.""" return time.time() - self.created_at > self.session_timeout_secCheck if the session has exceeded the timeout.
class CoinJoinState (*values)-
Expand source code
class CoinJoinState(str, Enum): """CoinJoin session states""" IDLE = "idle" FILL_RECEIVED = "fill_received" PUBKEY_SENT = "pubkey_sent" AUTH_RECEIVED = "auth_received" IOAUTH_SENT = "ioauth_sent" TX_RECEIVED = "tx_received" SIG_SENT = "sig_sent" COMPLETE = "complete" FAILED = "failed"CoinJoin session states
Ancestors
- builtins.str
- enum.Enum
Class variables
var AUTH_RECEIVED-
The type of the None singleton.
var COMPLETE-
The type of the None singleton.
var FAILED-
The type of the None singleton.
var FILL_RECEIVED-
The type of the None singleton.
var IDLE-
The type of the None singleton.
var IOAUTH_SENT-
The type of the None singleton.
var PUBKEY_SENT-
The type of the None singleton.
var SIG_SENT-
The type of the None singleton.
var TX_RECEIVED-
The type of the None singleton.