Module maker.coinjoin
CoinJoin protocol handler for makers.
Manages the maker side of the CoinJoin protocol: 1. !fill - Taker requests to fill order 2. !pubkey - Maker sends commitment pubkey 3. !auth - Taker sends PoDLE proof (VERIFY!) 4. !ioauth - Maker sends selected UTXOs 5. !tx - Taker sends unsigned transaction (VERIFY!) 6. !sig - Maker sends signatures
Classes
class CoinJoinSession (taker_nick: str,
offer: Offer,
wallet: WalletService,
backend: BlockchainBackend,
min_confirmations: int = 1,
taker_utxo_retries: int = 10,
taker_utxo_age: int = 5,
taker_utxo_amtpercent: int = 20,
session_timeout_sec: int = 300,
merge_algorithm: str = 'default')-
Expand source code
class CoinJoinSession: """ Manages a single CoinJoin session with a taker. """ def __init__( self, taker_nick: str, offer: Offer, wallet: WalletService, backend: BlockchainBackend, min_confirmations: int = 1, taker_utxo_retries: int = 10, taker_utxo_age: int = 5, taker_utxo_amtpercent: int = 20, session_timeout_sec: int = 300, merge_algorithm: str = "default", ): self.taker_nick = taker_nick self.offer = offer self.wallet = wallet self.backend = backend self.min_confirmations = min_confirmations self.taker_utxo_retries = taker_utxo_retries self.taker_utxo_age = taker_utxo_age self.taker_utxo_amtpercent = taker_utxo_amtpercent self.merge_algorithm = merge_algorithm # UTXO selection strategy self.state = CoinJoinState.IDLE self.amount = 0 self.our_utxos: dict[tuple[str, int], UTXOInfo] = {} self.cj_address = "" self.change_address = "" self.mixdepth = 0 self.commitment = b"" self.taker_nacl_pk = "" # Taker's NaCl pubkey (hex) for btc_sig self.created_at = time.time() self.session_timeout_sec = session_timeout_sec self.comm_channel = "" # Track communication channel ("direct" or "dir:<node_id>") # Feature detection for extended UTXO format (neutrino_compat) # Initially, we use extended format if our own backend requires it (neutrino) # This will be updated to True if taker sends extended format during !auth self.peer_neutrino_compat = backend.requires_neutrino_metadata() # E2E encryption session with taker self.crypto = CryptoSession() def is_timed_out(self) -> bool: """Check if the session has exceeded the timeout.""" return time.time() - self.created_at > self.session_timeout_sec def _get_channel_type(self, source: str) -> str: """Extract channel type from source string. The JoinMarket protocol allows messages to arrive via different directory servers (takers broadcast to all directories), so we only track "direct" vs "directory" to prevent mixing those two channel types. Args: source: Message source ("direct" or "dir:<node_id>") Returns: "direct" or "directory" """ if source == "direct": return "direct" if source.startswith("dir:"): return "directory" # Unknown source type, treat as its own type for safety return source def validate_channel(self, source: str) -> bool: """ Validate that message comes from the same channel TYPE as the session. We only check that "direct" vs "directory" are not mixed. Messages arriving from different directory servers (dir:serverA vs dir:serverB) are expected because takers broadcast to ALL directory servers. Mixing channel types (e.g., !fill via directory, !auth via direct) could indicate: - Session confusion attack - Accidental misconfiguration - Network issues causing routing inconsistency Args: source: Message source ("direct" or "dir:<node_id>") Returns: True if channel is valid, False if it violates consistency """ source_type = self._get_channel_type(source) if not self.comm_channel: # First message - record the channel type self.comm_channel = source_type logger.debug(f"Session with {self.taker_nick} established on channel: {source_type}") return True if self.comm_channel != source_type: logger.warning( f"Channel consistency violation for {self.taker_nick}: " f"session started on '{self.comm_channel}', " f"received message on '{source_type}'" ) return False return True async def handle_fill( self, amount: int, commitment: str, taker_pk: str ) -> tuple[bool, dict[str, Any]]: """ Handle !fill message from taker. Args: amount: CoinJoin amount requested commitment: PoDLE commitment (will be verified later in !auth) taker_pk: Taker's NaCl public key for E2E encryption Returns: (success, response_data) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.IDLE: return False, {"error": "Session not in IDLE state"} if amount < self.offer.minsize: return False, {"error": f"Amount too small: {amount} < {self.offer.minsize}"} if amount > self.offer.maxsize: return False, {"error": f"Amount too large: {amount} > {self.offer.maxsize}"} self.amount = amount self.commitment = bytes.fromhex(commitment) self.taker_nacl_pk = taker_pk # Store for btc_sig in handle_auth self.state = CoinJoinState.FILL_RECEIVED logger.info( f"Received !fill from {self.taker_nick}: " f"amount={amount}, commitment={commitment[:16]}..., taker_pk={taker_pk[:16]}..." ) # Set up E2E encryption with taker's NaCl pubkey try: self.crypto.setup_encryption(taker_pk) logger.debug(f"Set up encryption box with taker {self.taker_nick}") except Exception as e: logger.error(f"Failed to set up encryption with taker: {e}") return False, {"error": f"Invalid taker pubkey: {e}"} # Return our NaCl pubkey and features for E2E encryption setup # Format for !pubkey: <nacl_pubkey_hex> [features=<comma-separated>] # Features are optional - legacy peers won't send them nacl_pubkey = self.crypto.get_pubkey_hex() self.state = CoinJoinState.PUBKEY_SENT # Include features in the response # neutrino_compat: We support extended UTXO format (txid:vout:scriptpubkey:blockheight) # All modern makers can accept extended format (extra fields are simply ignored) features: list[str] = ["neutrino_compat"] return True, {"nacl_pubkey": nacl_pubkey, "features": features} except Exception as e: logger.error(f"Failed to handle !fill: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)} async def handle_auth( self, commitment: str, revelation: dict[str, Any], kphex: str ) -> tuple[bool, dict[str, Any]]: """ Handle !auth message from taker. CRITICAL SECURITY: Verifies PoDLE proof and taker's UTXO. Args: commitment: PoDLE commitment (should match from !fill) revelation: PoDLE revelation data kphex: Encryption key (hex) Returns: (success, response_data with UTXOs or error) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.PUBKEY_SENT: return False, {"error": "Session not in correct state for !auth"} commitment_bytes = bytes.fromhex(commitment) if commitment_bytes != self.commitment: logger.debug( f"Commitment mismatch: received={commitment[:16]}..., " f"expected={self.commitment.hex()[:16]}..." ) return False, {"error": "Commitment mismatch"} parsed_rev = parse_podle_revelation(revelation) if not parsed_rev: logger.debug(f"Failed to parse PoDLE revelation: {revelation}") return False, {"error": "Invalid PoDLE revelation format"} # Log PoDLE verification inputs at TRACE level logger.trace( f"PoDLE verification inputs: P={parsed_rev['P'].hex()[:32]}..., " f"P2={parsed_rev['P2'].hex()[:32]}..., sig={parsed_rev['sig'].hex()[:32]}..., " f"e={parsed_rev['e'].hex()[:16]}..., commitment={commitment[:16]}..." ) is_valid, error = verify_podle( parsed_rev["P"], parsed_rev["P2"], parsed_rev["sig"], parsed_rev["e"], commitment_bytes, index_range=range(self.taker_utxo_retries), ) if not is_valid: utxo_str = f"{parsed_rev['txid'][:16]}...:{parsed_rev['vout']}" logger.warning( f"PoDLE verification failed for {self.taker_nick}: {error} " f"(commitment={commitment[:16]}..., utxo={utxo_str})" ) return False, {"error": f"PoDLE verification failed: {error}"} logger.info("PoDLE proof verified ✓") logger.debug( f"PoDLE details: taker={self.taker_nick}, " f"utxo={parsed_rev['txid']}:{parsed_rev['vout']}, " f"commitment={commitment}" ) utxo_txid = parsed_rev["txid"] utxo_vout = parsed_rev["vout"] # Check for extended UTXO metadata (neutrino_compat feature) # The revelation may include scriptpubkey and blockheight taker_scriptpubkey = parsed_rev.get("scriptpubkey") taker_blockheight = parsed_rev.get("blockheight") # Track if taker sent extended format - we'll respond in kind taker_sent_extended = taker_scriptpubkey is not None and taker_blockheight is not None if taker_sent_extended: logger.debug("Taker sent extended UTXO format (neutrino_compat)") # Update our peer detection - taker supports neutrino_compat self.peer_neutrino_compat = True # Verify the taker's UTXO exists on the blockchain # Use Neutrino-compatible verification if backend requires it and metadata available if ( self.backend.requires_neutrino_metadata() and taker_scriptpubkey and taker_blockheight is not None ): # Neutrino backend: use metadata-based verification result = await self.backend.verify_utxo_with_metadata( txid=utxo_txid, vout=utxo_vout, scriptpubkey=taker_scriptpubkey, blockheight=taker_blockheight, ) if not result.valid: return False, {"error": f"Taker's UTXO verification failed: {result.error}"} taker_utxo_value = result.value taker_utxo_confirmations = result.confirmations logger.debug(f"Neutrino-verified taker's UTXO: {utxo_txid}:{utxo_vout}") else: # Full node: direct UTXO lookup taker_utxo = await self.backend.get_utxo(utxo_txid, utxo_vout) if not taker_utxo: return False, {"error": "Taker's UTXO not found on blockchain"} taker_utxo_value = taker_utxo.value taker_utxo_confirmations = taker_utxo.confirmations if taker_utxo_confirmations < self.taker_utxo_age: logger.debug( f"Taker UTXO too young: {utxo_txid}:{utxo_vout} has " f"{taker_utxo_confirmations} confirmations, need {self.taker_utxo_age}" ) return False, { "error": f"Taker's UTXO too young: " f"{taker_utxo_confirmations} < {self.taker_utxo_age}" } required_amount = int(self.amount * self.taker_utxo_amtpercent / 100) if taker_utxo_value < required_amount: logger.debug( f"Taker UTXO too small: {utxo_txid}:{utxo_vout} has " f"{taker_utxo_value} sats, need {required_amount} sats " f"({self.taker_utxo_amtpercent}% of {self.amount})" ) return False, { "error": f"Taker's UTXO too small: {taker_utxo_value} < {required_amount}" } logger.info("Taker's UTXO validated ✓") logger.debug( f"Taker UTXO details: {utxo_txid}:{utxo_vout}, " f"value={taker_utxo_value} sats, confirmations={taker_utxo_confirmations}" ) utxos_dict, cj_addr, change_addr, mixdepth = await self._select_our_utxos() if not utxos_dict: return False, {"error": "Failed to select UTXOs"} self.our_utxos = utxos_dict self.cj_address = cj_addr self.change_address = change_addr self.mixdepth = mixdepth # Format UTXOs: extended format (neutrino_compat) includes scriptpubkey:blockheight # Legacy format is just txid:vout utxo_metadata_list = [ UTXOMetadata( txid=txid, vout=vout, scriptpubkey=utxo_info.scriptpubkey, blockheight=utxo_info.height, ) for (txid, vout), utxo_info in utxos_dict.items() ] # Use extended format if peer supports neutrino_compat utxo_list_str = format_utxo_list(utxo_metadata_list, extended=self.peer_neutrino_compat) if self.peer_neutrino_compat: logger.debug("Using extended UTXO format for neutrino_compat peer") else: logger.debug("Using legacy UTXO format for legacy peer") # Get EC key for our first UTXO to sign taker's encryption key # This proves we own the UTXO we're contributing first_utxo_key, first_utxo_info = next(iter(utxos_dict.items())) auth_address = first_utxo_info.address auth_hd_key = self.wallet.get_key_for_address(auth_address) if auth_hd_key is None: return False, {"error": f"Could not get key for address {auth_address}"} # Get our EC pubkey (compressed) auth_pub_bytes = auth_hd_key.get_public_key_bytes() # Sign OUR OWN NaCl pubkey (hex string) with our EC key # This proves to the taker that we own the UTXO and links it to our encryption identity from jmcore.crypto import ecdsa_sign our_nacl_pk_hex = self.crypto.get_pubkey_hex() btc_sig = ecdsa_sign(our_nacl_pk_hex, auth_hd_key.get_private_key_bytes()) response = { "utxo_list": utxo_list_str, "auth_pub": auth_pub_bytes.hex(), "cj_addr": cj_addr, "change_addr": change_addr, "btc_sig": btc_sig, } self.state = CoinJoinState.IOAUTH_SENT logger.info(f"Sent !ioauth with {len(utxos_dict)} UTXOs") return True, response except Exception as e: logger.error(f"Failed to handle !auth: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)} async def handle_tx(self, tx_hex: str) -> tuple[bool, dict[str, Any]]: """ Handle !tx message from taker. CRITICAL SECURITY: Verifies unsigned transaction before signing! Args: tx_hex: Unsigned transaction hex Returns: (success, response_data with signatures or error) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.IOAUTH_SENT: return False, {"error": "Session not in correct state for !tx"} logger.info(f"Received !tx from {self.taker_nick}, verifying...") # Convert network string to NetworkType enum network = NetworkType(self.wallet.network) is_valid, error = verify_unsigned_transaction( tx_hex=tx_hex, our_utxos=self.our_utxos, cj_address=self.cj_address, change_address=self.change_address, amount=self.amount, cjfee=self.offer.cjfee, txfee=self.offer.txfee, offer_type=self.offer.ordertype, network=network, ) if not is_valid: logger.error(f"Transaction verification FAILED: {error}") self.state = CoinJoinState.FAILED return False, {"error": f"Transaction verification failed: {error}"} logger.info("Transaction verification PASSED ✓") signatures = await self._sign_transaction(tx_hex) # type: ignore[arg-type] if not signatures: return False, {"error": "Failed to sign transaction"} # Compute txid from the unsigned transaction for history tracking # The txid is computed from the non-witness data so we can calculate it now from jmcore.bitcoin import get_txid txid = get_txid(tx_hex) response = {"signatures": signatures, "txid": txid} self.state = CoinJoinState.SIG_SENT logger.info(f"Sent !sig with {len(signatures)} signatures (txid: {txid[:16]}...)") return True, response except Exception as e: logger.error(f"Failed to handle !tx: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)} async def _select_our_utxos( self, ) -> tuple[dict[tuple[str, int], UTXOInfo], str, str, int]: """ Select our UTXOs for the CoinJoin. Uses the configured merge_algorithm to determine UTXO selection: - default: Minimum UTXOs needed - gradual: +1 additional UTXO - greedy: ALL UTXOs from the mixdepth - random: +0 to +2 additional UTXOs Returns: (utxos_dict, cj_address, change_address, mixdepth) """ try: from jmcore.models import OfferType real_cjfee = 0 if self.offer.ordertype in (OfferType.SW0_ABSOLUTE, OfferType.SWA_ABSOLUTE): real_cjfee = int(self.offer.cjfee) else: from jmcore.bitcoin import calculate_relative_fee real_cjfee = calculate_relative_fee(self.amount, str(self.offer.cjfee)) total_amount = self.amount + self.offer.txfee required_amount = total_amount + 10000 - real_cjfee balances = {} for md in range(self.wallet.mixdepth_count): balance = await self.wallet.get_balance(md) balances[md] = balance eligible_mixdepths = {md: bal for md, bal in balances.items() if bal >= required_amount} if not eligible_mixdepths: logger.error(f"No mixdepth with sufficient balance: need {required_amount}") return {}, "", "", -1 max_mixdepth = max(eligible_mixdepths, key=lambda md: eligible_mixdepths[md]) # Use merge algorithm for UTXO selection # Makers can consolidate UTXOs "for free" since takers pay all fees selected = self.wallet.select_utxos_with_merge( max_mixdepth, required_amount, self.min_confirmations, merge_algorithm=self.merge_algorithm, ) utxos_dict = {(utxo.txid, utxo.vout): utxo for utxo in selected} cj_output_mixdepth = (max_mixdepth + 1) % self.wallet.mixdepth_count cj_index = self.wallet.get_next_address_index(cj_output_mixdepth, 1) cj_address = self.wallet.get_change_address(cj_output_mixdepth, cj_index) change_index = self.wallet.get_next_address_index(max_mixdepth, 1) change_address = self.wallet.get_change_address(max_mixdepth, change_index) # Reserve addresses immediately after selection to prevent reuse # in concurrent CoinJoin sessions. Once shared with a taker, addresses # must never be reused even if the CoinJoin fails. self.wallet.reserve_addresses({cj_address, change_address}) logger.info( f"Selected {len(selected)} UTXOs from mixdepth {max_mixdepth} " f"(merge_algorithm={self.merge_algorithm}), " f"total value: {sum(u.value for u in selected)} sats" ) return utxos_dict, cj_address, change_address, max_mixdepth except Exception as e: logger.error(f"Failed to select UTXOs: {e}") return {}, "", "", -1 async def _sign_transaction(self, tx_hex: str) -> list[str]: """Sign our inputs in the transaction. Returns list of base64-encoded signatures in JM format. Each signature is: base64(varint(sig_len) + sig + varint(pub_len) + pub) This matches the CScript serialization format. """ import base64 try: tx_bytes = bytes.fromhex(tx_hex) tx = deserialize_transaction(tx_bytes) signatures: list[str] = [] # Build a map of (txid, vout) -> input index for the transaction # Note: txid in tx.inputs is little-endian bytes, need to convert input_index_map: dict[tuple[str, int], int] = {} for idx, tx_input in enumerate(tx.inputs): # Convert little-endian txid bytes to big-endian hex string (RPC format) txid_hex = tx_input.txid_le[::-1].hex() input_index_map[(txid_hex, tx_input.vout)] = idx for (txid, vout), utxo_info in self.our_utxos.items(): # Find the input index in the transaction utxo_key = (txid, vout) if utxo_key not in input_index_map: logger.error(f"Our UTXO {txid}:{vout} not found in transaction inputs") continue input_index = input_index_map[utxo_key] # Safety check: Fidelity bond (P2WSH) UTXOs should never be in CoinJoins if utxo_info.is_p2wsh: raise TransactionSigningError( f"Cannot sign P2WSH UTXO {txid}:{vout} in CoinJoin - " f"fidelity bond UTXOs cannot be used in CoinJoins" ) key = self.wallet.get_key_for_address(utxo_info.address) if not key: raise TransactionSigningError(f"Missing key for address {utxo_info.address}") priv_key = key.private_key pubkey_bytes = key.get_public_key_bytes(compressed=True) logger.debug( f"Signing UTXO {txid}:{vout} at input_index={input_index}, " f"value={utxo_info.value}, address={utxo_info.address}, " f"pubkey={pubkey_bytes.hex()[:16]}..." ) script_code = create_p2wpkh_script_code(pubkey_bytes) signature = sign_p2wpkh_input( tx=tx, input_index=input_index, script_code=script_code, value=utxo_info.value, private_key=priv_key, ) # Format as CScript: varint(sig_len) + sig + varint(pub_len) + pub # For lengths < 0x4c (76), varint is just the length byte sig_len = len(signature) pub_len = len(pubkey_bytes) # Build the sigmsg in JM format sigmsg = bytes([sig_len]) + signature + bytes([pub_len]) + pubkey_bytes # Base64 encode for transmission sig_b64 = base64.b64encode(sigmsg).decode("ascii") signatures.append(sig_b64) logger.debug(f"Signed input {input_index} for UTXO {txid}:{vout}") return signatures except TransactionSigningError as e: logger.error(f"Signing error: {e}") return [] except Exception as e: logger.error(f"Failed to sign transaction: {e}") return []Manages a single CoinJoin session with a taker.
Methods
async def handle_auth(self, commitment: str, revelation: dict[str, Any], kphex: str) ‑> tuple[bool, dict[str, typing.Any]]-
Expand source code
async def handle_auth( self, commitment: str, revelation: dict[str, Any], kphex: str ) -> tuple[bool, dict[str, Any]]: """ Handle !auth message from taker. CRITICAL SECURITY: Verifies PoDLE proof and taker's UTXO. Args: commitment: PoDLE commitment (should match from !fill) revelation: PoDLE revelation data kphex: Encryption key (hex) Returns: (success, response_data with UTXOs or error) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.PUBKEY_SENT: return False, {"error": "Session not in correct state for !auth"} commitment_bytes = bytes.fromhex(commitment) if commitment_bytes != self.commitment: logger.debug( f"Commitment mismatch: received={commitment[:16]}..., " f"expected={self.commitment.hex()[:16]}..." ) return False, {"error": "Commitment mismatch"} parsed_rev = parse_podle_revelation(revelation) if not parsed_rev: logger.debug(f"Failed to parse PoDLE revelation: {revelation}") return False, {"error": "Invalid PoDLE revelation format"} # Log PoDLE verification inputs at TRACE level logger.trace( f"PoDLE verification inputs: P={parsed_rev['P'].hex()[:32]}..., " f"P2={parsed_rev['P2'].hex()[:32]}..., sig={parsed_rev['sig'].hex()[:32]}..., " f"e={parsed_rev['e'].hex()[:16]}..., commitment={commitment[:16]}..." ) is_valid, error = verify_podle( parsed_rev["P"], parsed_rev["P2"], parsed_rev["sig"], parsed_rev["e"], commitment_bytes, index_range=range(self.taker_utxo_retries), ) if not is_valid: utxo_str = f"{parsed_rev['txid'][:16]}...:{parsed_rev['vout']}" logger.warning( f"PoDLE verification failed for {self.taker_nick}: {error} " f"(commitment={commitment[:16]}..., utxo={utxo_str})" ) return False, {"error": f"PoDLE verification failed: {error}"} logger.info("PoDLE proof verified ✓") logger.debug( f"PoDLE details: taker={self.taker_nick}, " f"utxo={parsed_rev['txid']}:{parsed_rev['vout']}, " f"commitment={commitment}" ) utxo_txid = parsed_rev["txid"] utxo_vout = parsed_rev["vout"] # Check for extended UTXO metadata (neutrino_compat feature) # The revelation may include scriptpubkey and blockheight taker_scriptpubkey = parsed_rev.get("scriptpubkey") taker_blockheight = parsed_rev.get("blockheight") # Track if taker sent extended format - we'll respond in kind taker_sent_extended = taker_scriptpubkey is not None and taker_blockheight is not None if taker_sent_extended: logger.debug("Taker sent extended UTXO format (neutrino_compat)") # Update our peer detection - taker supports neutrino_compat self.peer_neutrino_compat = True # Verify the taker's UTXO exists on the blockchain # Use Neutrino-compatible verification if backend requires it and metadata available if ( self.backend.requires_neutrino_metadata() and taker_scriptpubkey and taker_blockheight is not None ): # Neutrino backend: use metadata-based verification result = await self.backend.verify_utxo_with_metadata( txid=utxo_txid, vout=utxo_vout, scriptpubkey=taker_scriptpubkey, blockheight=taker_blockheight, ) if not result.valid: return False, {"error": f"Taker's UTXO verification failed: {result.error}"} taker_utxo_value = result.value taker_utxo_confirmations = result.confirmations logger.debug(f"Neutrino-verified taker's UTXO: {utxo_txid}:{utxo_vout}") else: # Full node: direct UTXO lookup taker_utxo = await self.backend.get_utxo(utxo_txid, utxo_vout) if not taker_utxo: return False, {"error": "Taker's UTXO not found on blockchain"} taker_utxo_value = taker_utxo.value taker_utxo_confirmations = taker_utxo.confirmations if taker_utxo_confirmations < self.taker_utxo_age: logger.debug( f"Taker UTXO too young: {utxo_txid}:{utxo_vout} has " f"{taker_utxo_confirmations} confirmations, need {self.taker_utxo_age}" ) return False, { "error": f"Taker's UTXO too young: " f"{taker_utxo_confirmations} < {self.taker_utxo_age}" } required_amount = int(self.amount * self.taker_utxo_amtpercent / 100) if taker_utxo_value < required_amount: logger.debug( f"Taker UTXO too small: {utxo_txid}:{utxo_vout} has " f"{taker_utxo_value} sats, need {required_amount} sats " f"({self.taker_utxo_amtpercent}% of {self.amount})" ) return False, { "error": f"Taker's UTXO too small: {taker_utxo_value} < {required_amount}" } logger.info("Taker's UTXO validated ✓") logger.debug( f"Taker UTXO details: {utxo_txid}:{utxo_vout}, " f"value={taker_utxo_value} sats, confirmations={taker_utxo_confirmations}" ) utxos_dict, cj_addr, change_addr, mixdepth = await self._select_our_utxos() if not utxos_dict: return False, {"error": "Failed to select UTXOs"} self.our_utxos = utxos_dict self.cj_address = cj_addr self.change_address = change_addr self.mixdepth = mixdepth # Format UTXOs: extended format (neutrino_compat) includes scriptpubkey:blockheight # Legacy format is just txid:vout utxo_metadata_list = [ UTXOMetadata( txid=txid, vout=vout, scriptpubkey=utxo_info.scriptpubkey, blockheight=utxo_info.height, ) for (txid, vout), utxo_info in utxos_dict.items() ] # Use extended format if peer supports neutrino_compat utxo_list_str = format_utxo_list(utxo_metadata_list, extended=self.peer_neutrino_compat) if self.peer_neutrino_compat: logger.debug("Using extended UTXO format for neutrino_compat peer") else: logger.debug("Using legacy UTXO format for legacy peer") # Get EC key for our first UTXO to sign taker's encryption key # This proves we own the UTXO we're contributing first_utxo_key, first_utxo_info = next(iter(utxos_dict.items())) auth_address = first_utxo_info.address auth_hd_key = self.wallet.get_key_for_address(auth_address) if auth_hd_key is None: return False, {"error": f"Could not get key for address {auth_address}"} # Get our EC pubkey (compressed) auth_pub_bytes = auth_hd_key.get_public_key_bytes() # Sign OUR OWN NaCl pubkey (hex string) with our EC key # This proves to the taker that we own the UTXO and links it to our encryption identity from jmcore.crypto import ecdsa_sign our_nacl_pk_hex = self.crypto.get_pubkey_hex() btc_sig = ecdsa_sign(our_nacl_pk_hex, auth_hd_key.get_private_key_bytes()) response = { "utxo_list": utxo_list_str, "auth_pub": auth_pub_bytes.hex(), "cj_addr": cj_addr, "change_addr": change_addr, "btc_sig": btc_sig, } self.state = CoinJoinState.IOAUTH_SENT logger.info(f"Sent !ioauth with {len(utxos_dict)} UTXOs") return True, response except Exception as e: logger.error(f"Failed to handle !auth: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)}Handle !auth message from taker.
CRITICAL SECURITY: Verifies PoDLE proof and taker's UTXO.
Args
commitment- PoDLE commitment (should match from !fill)
revelation- PoDLE revelation data
kphex- Encryption key (hex)
Returns
(success, response_data with UTXOs or error)
async def handle_fill(self, amount: int, commitment: str, taker_pk: str) ‑> tuple[bool, dict[str, typing.Any]]-
Expand source code
async def handle_fill( self, amount: int, commitment: str, taker_pk: str ) -> tuple[bool, dict[str, Any]]: """ Handle !fill message from taker. Args: amount: CoinJoin amount requested commitment: PoDLE commitment (will be verified later in !auth) taker_pk: Taker's NaCl public key for E2E encryption Returns: (success, response_data) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.IDLE: return False, {"error": "Session not in IDLE state"} if amount < self.offer.minsize: return False, {"error": f"Amount too small: {amount} < {self.offer.minsize}"} if amount > self.offer.maxsize: return False, {"error": f"Amount too large: {amount} > {self.offer.maxsize}"} self.amount = amount self.commitment = bytes.fromhex(commitment) self.taker_nacl_pk = taker_pk # Store for btc_sig in handle_auth self.state = CoinJoinState.FILL_RECEIVED logger.info( f"Received !fill from {self.taker_nick}: " f"amount={amount}, commitment={commitment[:16]}..., taker_pk={taker_pk[:16]}..." ) # Set up E2E encryption with taker's NaCl pubkey try: self.crypto.setup_encryption(taker_pk) logger.debug(f"Set up encryption box with taker {self.taker_nick}") except Exception as e: logger.error(f"Failed to set up encryption with taker: {e}") return False, {"error": f"Invalid taker pubkey: {e}"} # Return our NaCl pubkey and features for E2E encryption setup # Format for !pubkey: <nacl_pubkey_hex> [features=<comma-separated>] # Features are optional - legacy peers won't send them nacl_pubkey = self.crypto.get_pubkey_hex() self.state = CoinJoinState.PUBKEY_SENT # Include features in the response # neutrino_compat: We support extended UTXO format (txid:vout:scriptpubkey:blockheight) # All modern makers can accept extended format (extra fields are simply ignored) features: list[str] = ["neutrino_compat"] return True, {"nacl_pubkey": nacl_pubkey, "features": features} except Exception as e: logger.error(f"Failed to handle !fill: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)}Handle !fill message from taker.
Args
amount- CoinJoin amount requested
commitment- PoDLE commitment (will be verified later in !auth)
taker_pk- Taker's NaCl public key for E2E encryption
Returns
(success, response_data)
async def handle_tx(self, tx_hex: str) ‑> tuple[bool, dict[str, typing.Any]]-
Expand source code
async def handle_tx(self, tx_hex: str) -> tuple[bool, dict[str, Any]]: """ Handle !tx message from taker. CRITICAL SECURITY: Verifies unsigned transaction before signing! Args: tx_hex: Unsigned transaction hex Returns: (success, response_data with signatures or error) """ try: if self.is_timed_out(): self.state = CoinJoinState.FAILED return False, {"error": f"Session timed out after {self.session_timeout_sec}s"} if self.state != CoinJoinState.IOAUTH_SENT: return False, {"error": "Session not in correct state for !tx"} logger.info(f"Received !tx from {self.taker_nick}, verifying...") # Convert network string to NetworkType enum network = NetworkType(self.wallet.network) is_valid, error = verify_unsigned_transaction( tx_hex=tx_hex, our_utxos=self.our_utxos, cj_address=self.cj_address, change_address=self.change_address, amount=self.amount, cjfee=self.offer.cjfee, txfee=self.offer.txfee, offer_type=self.offer.ordertype, network=network, ) if not is_valid: logger.error(f"Transaction verification FAILED: {error}") self.state = CoinJoinState.FAILED return False, {"error": f"Transaction verification failed: {error}"} logger.info("Transaction verification PASSED ✓") signatures = await self._sign_transaction(tx_hex) # type: ignore[arg-type] if not signatures: return False, {"error": "Failed to sign transaction"} # Compute txid from the unsigned transaction for history tracking # The txid is computed from the non-witness data so we can calculate it now from jmcore.bitcoin import get_txid txid = get_txid(tx_hex) response = {"signatures": signatures, "txid": txid} self.state = CoinJoinState.SIG_SENT logger.info(f"Sent !sig with {len(signatures)} signatures (txid: {txid[:16]}...)") return True, response except Exception as e: logger.error(f"Failed to handle !tx: {e}") self.state = CoinJoinState.FAILED return False, {"error": str(e)}Handle !tx message from taker.
CRITICAL SECURITY: Verifies unsigned transaction before signing!
Args
tx_hex- Unsigned transaction hex
Returns
(success, response_data with signatures or error)
def is_timed_out(self) ‑> bool-
Expand source code
def is_timed_out(self) -> bool: """Check if the session has exceeded the timeout.""" return time.time() - self.created_at > self.session_timeout_secCheck if the session has exceeded the timeout.
def validate_channel(self, source: str) ‑> bool-
Expand source code
def validate_channel(self, source: str) -> bool: """ Validate that message comes from the same channel TYPE as the session. We only check that "direct" vs "directory" are not mixed. Messages arriving from different directory servers (dir:serverA vs dir:serverB) are expected because takers broadcast to ALL directory servers. Mixing channel types (e.g., !fill via directory, !auth via direct) could indicate: - Session confusion attack - Accidental misconfiguration - Network issues causing routing inconsistency Args: source: Message source ("direct" or "dir:<node_id>") Returns: True if channel is valid, False if it violates consistency """ source_type = self._get_channel_type(source) if not self.comm_channel: # First message - record the channel type self.comm_channel = source_type logger.debug(f"Session with {self.taker_nick} established on channel: {source_type}") return True if self.comm_channel != source_type: logger.warning( f"Channel consistency violation for {self.taker_nick}: " f"session started on '{self.comm_channel}', " f"received message on '{source_type}'" ) return False return TrueValidate that message comes from the same channel TYPE as the session.
We only check that "direct" vs "directory" are not mixed. Messages arriving from different directory servers (dir:serverA vs dir:serverB) are expected because takers broadcast to ALL directory servers.
Mixing channel types (e.g., !fill via directory, !auth via direct) could indicate: - Session confusion attack - Accidental misconfiguration - Network issues causing routing inconsistency
Args
source- Message source ("direct" or "dir:
")
Returns
True if channel is valid, False if it violates consistency
class CoinJoinState (*values)-
Expand source code
class CoinJoinState(str, Enum): """CoinJoin session states""" IDLE = "idle" FILL_RECEIVED = "fill_received" PUBKEY_SENT = "pubkey_sent" AUTH_RECEIVED = "auth_received" IOAUTH_SENT = "ioauth_sent" TX_RECEIVED = "tx_received" SIG_SENT = "sig_sent" COMPLETE = "complete" FAILED = "failed"CoinJoin session states
Ancestors
- builtins.str
- enum.Enum
Class variables
var AUTH_RECEIVED-
The type of the None singleton.
var COMPLETE-
The type of the None singleton.
var FAILED-
The type of the None singleton.
var FILL_RECEIVED-
The type of the None singleton.
var IDLE-
The type of the None singleton.
var IOAUTH_SENT-
The type of the None singleton.
var PUBKEY_SENT-
The type of the None singleton.
var SIG_SENT-
The type of the None singleton.
var TX_RECEIVED-
The type of the None singleton.