Module maker.coinjoin

CoinJoin protocol handler for makers.

Manages the maker side of the CoinJoin protocol: 1. !fill - Taker requests to fill order 2. !pubkey - Maker sends commitment pubkey 3. !auth - Taker sends PoDLE proof (VERIFY!) 4. !ioauth - Maker sends selected UTXOs 5. !tx - Taker sends unsigned transaction (VERIFY!) 6. !sig - Maker sends signatures

Classes

class CoinJoinSession (taker_nick: str,
offer: Offer,
wallet: WalletService,
backend: BlockchainBackend,
min_confirmations: int = 1,
taker_utxo_retries: int = 10,
taker_utxo_age: int = 5,
taker_utxo_amtpercent: int = 20,
session_timeout_sec: int = 300,
merge_algorithm: str = 'default')
Expand source code
class CoinJoinSession:
    """
    Manages a single CoinJoin session with a taker.
    """

    def __init__(
        self,
        taker_nick: str,
        offer: Offer,
        wallet: WalletService,
        backend: BlockchainBackend,
        min_confirmations: int = 1,
        taker_utxo_retries: int = 10,
        taker_utxo_age: int = 5,
        taker_utxo_amtpercent: int = 20,
        session_timeout_sec: int = 300,
        merge_algorithm: str = "default",
    ):
        self.taker_nick = taker_nick
        self.offer = offer
        self.wallet = wallet
        self.backend = backend
        self.min_confirmations = min_confirmations
        self.taker_utxo_retries = taker_utxo_retries
        self.taker_utxo_age = taker_utxo_age
        self.taker_utxo_amtpercent = taker_utxo_amtpercent
        self.merge_algorithm = merge_algorithm  # UTXO selection strategy

        self.state = CoinJoinState.IDLE
        self.amount = 0
        self.our_utxos: dict[tuple[str, int], UTXOInfo] = {}
        self.cj_address = ""
        self.change_address = ""
        self.mixdepth = 0
        self.commitment = b""
        self.taker_nacl_pk = ""  # Taker's NaCl pubkey (hex) for btc_sig
        self.created_at = time.time()
        self.session_timeout_sec = session_timeout_sec

        # Feature detection for extended UTXO format (neutrino_compat)
        # Initially, we use extended format if our own backend requires it (neutrino)
        # This will be updated to True if taker sends extended format during !auth
        self.peer_neutrino_compat = backend.requires_neutrino_metadata()

        # E2E encryption session with taker
        self.crypto = CryptoSession()

    def is_timed_out(self) -> bool:
        """Check if the session has exceeded the timeout."""
        return time.time() - self.created_at > self.session_timeout_sec

    async def handle_fill(
        self, amount: int, commitment: str, taker_pk: str
    ) -> tuple[bool, dict[str, Any]]:
        """
        Handle !fill message from taker.

        Args:
            amount: CoinJoin amount requested
            commitment: PoDLE commitment (will be verified later in !auth)
            taker_pk: Taker's NaCl public key for E2E encryption

        Returns:
            (success, response_data)
        """
        try:
            if self.is_timed_out():
                self.state = CoinJoinState.FAILED
                return False, {"error": f"Session timed out after {self.session_timeout_sec}s"}

            if self.state != CoinJoinState.IDLE:
                return False, {"error": "Session not in IDLE state"}

            if amount < self.offer.minsize:
                return False, {"error": f"Amount too small: {amount} < {self.offer.minsize}"}

            if amount > self.offer.maxsize:
                return False, {"error": f"Amount too large: {amount} > {self.offer.maxsize}"}

            self.amount = amount
            self.commitment = bytes.fromhex(commitment)
            self.taker_nacl_pk = taker_pk  # Store for btc_sig in handle_auth
            self.state = CoinJoinState.FILL_RECEIVED

            logger.info(
                f"Received !fill from {self.taker_nick}: "
                f"amount={amount}, commitment={commitment[:16]}..., taker_pk={taker_pk[:16]}..."
            )

            # Set up E2E encryption with taker's NaCl pubkey
            try:
                self.crypto.setup_encryption(taker_pk)
                logger.debug(f"Set up encryption box with taker {self.taker_nick}")
            except Exception as e:
                logger.error(f"Failed to set up encryption with taker: {e}")
                return False, {"error": f"Invalid taker pubkey: {e}"}

            # Return our NaCl pubkey and features for E2E encryption setup
            # Format for !pubkey: <nacl_pubkey_hex> [features=<comma-separated>]
            # Features are optional - legacy peers won't send them
            nacl_pubkey = self.crypto.get_pubkey_hex()

            self.state = CoinJoinState.PUBKEY_SENT

            # Include features in the response
            # neutrino_compat: We support extended UTXO format (txid:vout:scriptpubkey:blockheight)
            # All modern makers can accept extended format (extra fields are simply ignored)
            features: list[str] = ["neutrino_compat"]

            return True, {"nacl_pubkey": nacl_pubkey, "features": features}

        except Exception as e:
            logger.error(f"Failed to handle !fill: {e}")
            self.state = CoinJoinState.FAILED
            return False, {"error": str(e)}

    async def handle_auth(
        self, commitment: str, revelation: dict[str, Any], kphex: str
    ) -> tuple[bool, dict[str, Any]]:
        """
        Handle !auth message from taker.

        CRITICAL SECURITY: Verifies PoDLE proof and taker's UTXO.

        Args:
            commitment: PoDLE commitment (should match from !fill)
            revelation: PoDLE revelation data
            kphex: Encryption key (hex)

        Returns:
            (success, response_data with UTXOs or error)
        """
        try:
            if self.is_timed_out():
                self.state = CoinJoinState.FAILED
                return False, {"error": f"Session timed out after {self.session_timeout_sec}s"}

            if self.state != CoinJoinState.PUBKEY_SENT:
                return False, {"error": "Session not in correct state for !auth"}

            commitment_bytes = bytes.fromhex(commitment)
            if commitment_bytes != self.commitment:
                return False, {"error": "Commitment mismatch"}

            parsed_rev = parse_podle_revelation(revelation)
            if not parsed_rev:
                return False, {"error": "Invalid PoDLE revelation format"}

            is_valid, error = verify_podle(
                parsed_rev["P"],
                parsed_rev["P2"],
                parsed_rev["sig"],
                parsed_rev["e"],
                commitment_bytes,
                index_range=range(self.taker_utxo_retries),
            )

            if not is_valid:
                logger.warning(f"PoDLE verification failed: {error}")
                return False, {"error": f"PoDLE verification failed: {error}"}

            logger.info("PoDLE proof verified ✓")

            utxo_txid = parsed_rev["txid"]
            utxo_vout = parsed_rev["vout"]

            # Check for extended UTXO metadata (neutrino_compat feature)
            # The revelation may include scriptpubkey and blockheight
            taker_scriptpubkey = parsed_rev.get("scriptpubkey")
            taker_blockheight = parsed_rev.get("blockheight")

            # Track if taker sent extended format - we'll respond in kind
            taker_sent_extended = taker_scriptpubkey is not None and taker_blockheight is not None
            if taker_sent_extended:
                logger.debug("Taker sent extended UTXO format (neutrino_compat)")
                # Update our peer detection - taker supports neutrino_compat
                self.peer_neutrino_compat = True

            # Verify the taker's UTXO exists on the blockchain
            # Use Neutrino-compatible verification if backend requires it and metadata available
            if (
                self.backend.requires_neutrino_metadata()
                and taker_scriptpubkey
                and taker_blockheight is not None
            ):
                # Neutrino backend: use metadata-based verification
                result = await self.backend.verify_utxo_with_metadata(
                    txid=utxo_txid,
                    vout=utxo_vout,
                    scriptpubkey=taker_scriptpubkey,
                    blockheight=taker_blockheight,
                )
                if not result.valid:
                    return False, {"error": f"Taker's UTXO verification failed: {result.error}"}

                taker_utxo_value = result.value
                taker_utxo_confirmations = result.confirmations
                logger.debug(f"Neutrino-verified taker's UTXO: {utxo_txid}:{utxo_vout}")
            else:
                # Full node: direct UTXO lookup
                taker_utxo = await self.backend.get_utxo(utxo_txid, utxo_vout)

                if not taker_utxo:
                    return False, {"error": "Taker's UTXO not found on blockchain"}

                taker_utxo_value = taker_utxo.value
                taker_utxo_confirmations = taker_utxo.confirmations

            if taker_utxo_confirmations < self.taker_utxo_age:
                return False, {
                    "error": f"Taker's UTXO too young: "
                    f"{taker_utxo_confirmations} < {self.taker_utxo_age}"
                }

            required_amount = int(self.amount * self.taker_utxo_amtpercent / 100)
            if taker_utxo_value < required_amount:
                return False, {
                    "error": f"Taker's UTXO too small: {taker_utxo_value} < {required_amount}"
                }

            logger.info("Taker's UTXO validated ✓")

            utxos_dict, cj_addr, change_addr, mixdepth = await self._select_our_utxos()

            if not utxos_dict:
                return False, {"error": "Failed to select UTXOs"}

            self.our_utxos = utxos_dict
            self.cj_address = cj_addr
            self.change_address = change_addr
            self.mixdepth = mixdepth

            # Format UTXOs: extended format (neutrino_compat) includes scriptpubkey:blockheight
            # Legacy format is just txid:vout
            utxo_metadata_list = [
                UTXOMetadata(
                    txid=txid,
                    vout=vout,
                    scriptpubkey=utxo_info.scriptpubkey,
                    blockheight=utxo_info.height,
                )
                for (txid, vout), utxo_info in utxos_dict.items()
            ]

            # Use extended format if peer supports neutrino_compat
            utxo_list_str = format_utxo_list(utxo_metadata_list, extended=self.peer_neutrino_compat)
            if self.peer_neutrino_compat:
                logger.debug("Using extended UTXO format for neutrino_compat peer")
            else:
                logger.debug("Using legacy UTXO format for legacy peer")

            # Get EC key for our first UTXO to sign taker's encryption key
            # This proves we own the UTXO we're contributing
            first_utxo_key, first_utxo_info = next(iter(utxos_dict.items()))
            auth_address = first_utxo_info.address
            auth_hd_key = self.wallet.get_key_for_address(auth_address)

            if auth_hd_key is None:
                return False, {"error": f"Could not get key for address {auth_address}"}

            # Get our EC pubkey (compressed)
            auth_pub_bytes = auth_hd_key.get_public_key_bytes()

            # Sign OUR OWN NaCl pubkey (hex string) with our EC key
            # This proves to the taker that we own the UTXO and links it to our encryption identity
            from jmcore.crypto import ecdsa_sign

            our_nacl_pk_hex = self.crypto.get_pubkey_hex()
            btc_sig = ecdsa_sign(our_nacl_pk_hex, auth_hd_key.get_private_key_bytes())

            response = {
                "utxo_list": utxo_list_str,
                "auth_pub": auth_pub_bytes.hex(),
                "cj_addr": cj_addr,
                "change_addr": change_addr,
                "btc_sig": btc_sig,
            }

            self.state = CoinJoinState.IOAUTH_SENT
            logger.info(f"Sent !ioauth with {len(utxos_dict)} UTXOs")

            return True, response

        except Exception as e:
            logger.error(f"Failed to handle !auth: {e}")
            self.state = CoinJoinState.FAILED
            return False, {"error": str(e)}

    async def handle_tx(self, tx_hex: str) -> tuple[bool, dict[str, Any]]:
        """
        Handle !tx message from taker.

        CRITICAL SECURITY: Verifies unsigned transaction before signing!

        Args:
            tx_hex: Unsigned transaction hex

        Returns:
            (success, response_data with signatures or error)
        """
        try:
            if self.is_timed_out():
                self.state = CoinJoinState.FAILED
                return False, {"error": f"Session timed out after {self.session_timeout_sec}s"}

            if self.state != CoinJoinState.IOAUTH_SENT:
                return False, {"error": "Session not in correct state for !tx"}

            logger.info(f"Received !tx from {self.taker_nick}, verifying...")

            # Convert network string to NetworkType enum
            network = NetworkType(self.wallet.network)

            is_valid, error = verify_unsigned_transaction(
                tx_hex=tx_hex,
                our_utxos=self.our_utxos,
                cj_address=self.cj_address,
                change_address=self.change_address,
                amount=self.amount,
                cjfee=self.offer.cjfee,
                txfee=self.offer.txfee,
                offer_type=self.offer.ordertype,
                network=network,
            )

            if not is_valid:
                logger.error(f"Transaction verification FAILED: {error}")
                self.state = CoinJoinState.FAILED
                return False, {"error": f"Transaction verification failed: {error}"}

            logger.info("Transaction verification PASSED ✓")

            signatures = await self._sign_transaction(tx_hex)  # type: ignore[arg-type]

            if not signatures:
                return False, {"error": "Failed to sign transaction"}

            # Compute txid from the unsigned transaction for history tracking
            # The txid is computed from the non-witness data so we can calculate it now
            from jmcore.bitcoin import get_txid

            txid = get_txid(tx_hex)

            response = {"signatures": signatures, "txid": txid}

            self.state = CoinJoinState.SIG_SENT
            logger.info(f"Sent !sig with {len(signatures)} signatures (txid: {txid[:16]}...)")

            return True, response

        except Exception as e:
            logger.error(f"Failed to handle !tx: {e}")
            self.state = CoinJoinState.FAILED
            return False, {"error": str(e)}

    async def _select_our_utxos(
        self,
    ) -> tuple[dict[tuple[str, int], UTXOInfo], str, str, int]:
        """
        Select our UTXOs for the CoinJoin.

        Uses the configured merge_algorithm to determine UTXO selection:
        - default: Minimum UTXOs needed
        - gradual: +1 additional UTXO
        - greedy: ALL UTXOs from the mixdepth
        - random: +0 to +2 additional UTXOs

        Returns:
            (utxos_dict, cj_address, change_address, mixdepth)
        """
        try:
            from jmcore.models import OfferType

            real_cjfee = 0
            if self.offer.ordertype in (OfferType.SW0_ABSOLUTE, OfferType.SWA_ABSOLUTE):
                real_cjfee = int(self.offer.cjfee)
            else:
                from jmcore.bitcoin import calculate_relative_fee

                real_cjfee = calculate_relative_fee(self.amount, str(self.offer.cjfee))

            total_amount = self.amount + self.offer.txfee
            required_amount = total_amount + 10000 - real_cjfee

            balances = {}
            for md in range(self.wallet.mixdepth_count):
                balance = await self.wallet.get_balance(md)
                balances[md] = balance

            eligible_mixdepths = {md: bal for md, bal in balances.items() if bal >= required_amount}

            if not eligible_mixdepths:
                logger.error(f"No mixdepth with sufficient balance: need {required_amount}")
                return {}, "", "", -1

            max_mixdepth = max(eligible_mixdepths, key=lambda md: eligible_mixdepths[md])

            # Use merge algorithm for UTXO selection
            # Makers can consolidate UTXOs "for free" since takers pay all fees
            selected = self.wallet.select_utxos_with_merge(
                max_mixdepth,
                required_amount,
                self.min_confirmations,
                merge_algorithm=self.merge_algorithm,
            )

            utxos_dict = {(utxo.txid, utxo.vout): utxo for utxo in selected}

            cj_output_mixdepth = (max_mixdepth + 1) % self.wallet.mixdepth_count
            cj_index = self.wallet.get_next_address_index(cj_output_mixdepth, 1)
            cj_address = self.wallet.get_change_address(cj_output_mixdepth, cj_index)

            change_index = self.wallet.get_next_address_index(max_mixdepth, 1)
            change_address = self.wallet.get_change_address(max_mixdepth, change_index)

            logger.info(
                f"Selected {len(selected)} UTXOs from mixdepth {max_mixdepth} "
                f"(merge_algorithm={self.merge_algorithm}), "
                f"total value: {sum(u.value for u in selected)} sats"
            )

            return utxos_dict, cj_address, change_address, max_mixdepth

        except Exception as e:
            logger.error(f"Failed to select UTXOs: {e}")
            return {}, "", "", -1

    async def _sign_transaction(self, tx_hex: str) -> list[str]:
        """Sign our inputs in the transaction.

        Returns list of base64-encoded signatures in JM format.
        Each signature is: base64(varint(sig_len) + sig + varint(pub_len) + pub)
        This matches the CScript serialization format.
        """
        import base64

        try:
            tx_bytes = bytes.fromhex(tx_hex)
            tx = deserialize_transaction(tx_bytes)

            signatures: list[str] = []

            # Build a map of (txid, vout) -> input index for the transaction
            # Note: txid in tx.inputs is little-endian bytes, need to convert
            input_index_map: dict[tuple[str, int], int] = {}
            for idx, tx_input in enumerate(tx.inputs):
                # Convert little-endian txid bytes to big-endian hex string (RPC format)
                txid_hex = tx_input.txid_le[::-1].hex()
                input_index_map[(txid_hex, tx_input.vout)] = idx

            for (txid, vout), utxo_info in self.our_utxos.items():
                # Find the input index in the transaction
                utxo_key = (txid, vout)
                if utxo_key not in input_index_map:
                    logger.error(f"Our UTXO {txid}:{vout} not found in transaction inputs")
                    continue

                input_index = input_index_map[utxo_key]

                # Safety check: Fidelity bond (P2WSH) UTXOs should never be in CoinJoins
                if utxo_info.is_p2wsh:
                    raise TransactionSigningError(
                        f"Cannot sign P2WSH UTXO {txid}:{vout} in CoinJoin - "
                        f"fidelity bond UTXOs cannot be used in CoinJoins"
                    )

                key = self.wallet.get_key_for_address(utxo_info.address)
                if not key:
                    raise TransactionSigningError(f"Missing key for address {utxo_info.address}")

                priv_key = key.private_key
                pubkey_bytes = key.get_public_key_bytes(compressed=True)

                logger.debug(
                    f"Signing UTXO {txid}:{vout} at input_index={input_index}, "
                    f"value={utxo_info.value}, address={utxo_info.address}, "
                    f"pubkey={pubkey_bytes.hex()[:16]}..."
                )

                script_code = create_p2wpkh_script_code(pubkey_bytes)
                signature = sign_p2wpkh_input(
                    tx=tx,
                    input_index=input_index,
                    script_code=script_code,
                    value=utxo_info.value,
                    private_key=priv_key,
                )

                # Format as CScript: varint(sig_len) + sig + varint(pub_len) + pub
                # For lengths < 0x4c (76), varint is just the length byte
                sig_len = len(signature)
                pub_len = len(pubkey_bytes)

                # Build the sigmsg in JM format
                sigmsg = bytes([sig_len]) + signature + bytes([pub_len]) + pubkey_bytes

                # Base64 encode for transmission
                sig_b64 = base64.b64encode(sigmsg).decode("ascii")
                signatures.append(sig_b64)

                logger.debug(f"Signed input {input_index} for UTXO {txid}:{vout}")

            return signatures

        except TransactionSigningError as e:
            logger.error(f"Signing error: {e}")
            return []
        except Exception as e:
            logger.error(f"Failed to sign transaction: {e}")
            return []

Manages a single CoinJoin session with a taker.

Methods

async def handle_auth(self, commitment: str, revelation: dict[str, Any], kphex: str) ‑> tuple[bool, dict[str, typing.Any]]
Expand source code
async def handle_auth(
    self, commitment: str, revelation: dict[str, Any], kphex: str
) -> tuple[bool, dict[str, Any]]:
    """
    Handle !auth message from taker.

    CRITICAL SECURITY: Verifies PoDLE proof and taker's UTXO.

    Args:
        commitment: PoDLE commitment (should match from !fill)
        revelation: PoDLE revelation data
        kphex: Encryption key (hex)

    Returns:
        (success, response_data with UTXOs or error)
    """
    try:
        if self.is_timed_out():
            self.state = CoinJoinState.FAILED
            return False, {"error": f"Session timed out after {self.session_timeout_sec}s"}

        if self.state != CoinJoinState.PUBKEY_SENT:
            return False, {"error": "Session not in correct state for !auth"}

        commitment_bytes = bytes.fromhex(commitment)
        if commitment_bytes != self.commitment:
            return False, {"error": "Commitment mismatch"}

        parsed_rev = parse_podle_revelation(revelation)
        if not parsed_rev:
            return False, {"error": "Invalid PoDLE revelation format"}

        is_valid, error = verify_podle(
            parsed_rev["P"],
            parsed_rev["P2"],
            parsed_rev["sig"],
            parsed_rev["e"],
            commitment_bytes,
            index_range=range(self.taker_utxo_retries),
        )

        if not is_valid:
            logger.warning(f"PoDLE verification failed: {error}")
            return False, {"error": f"PoDLE verification failed: {error}"}

        logger.info("PoDLE proof verified ✓")

        utxo_txid = parsed_rev["txid"]
        utxo_vout = parsed_rev["vout"]

        # Check for extended UTXO metadata (neutrino_compat feature)
        # The revelation may include scriptpubkey and blockheight
        taker_scriptpubkey = parsed_rev.get("scriptpubkey")
        taker_blockheight = parsed_rev.get("blockheight")

        # Track if taker sent extended format - we'll respond in kind
        taker_sent_extended = taker_scriptpubkey is not None and taker_blockheight is not None
        if taker_sent_extended:
            logger.debug("Taker sent extended UTXO format (neutrino_compat)")
            # Update our peer detection - taker supports neutrino_compat
            self.peer_neutrino_compat = True

        # Verify the taker's UTXO exists on the blockchain
        # Use Neutrino-compatible verification if backend requires it and metadata available
        if (
            self.backend.requires_neutrino_metadata()
            and taker_scriptpubkey
            and taker_blockheight is not None
        ):
            # Neutrino backend: use metadata-based verification
            result = await self.backend.verify_utxo_with_metadata(
                txid=utxo_txid,
                vout=utxo_vout,
                scriptpubkey=taker_scriptpubkey,
                blockheight=taker_blockheight,
            )
            if not result.valid:
                return False, {"error": f"Taker's UTXO verification failed: {result.error}"}

            taker_utxo_value = result.value
            taker_utxo_confirmations = result.confirmations
            logger.debug(f"Neutrino-verified taker's UTXO: {utxo_txid}:{utxo_vout}")
        else:
            # Full node: direct UTXO lookup
            taker_utxo = await self.backend.get_utxo(utxo_txid, utxo_vout)

            if not taker_utxo:
                return False, {"error": "Taker's UTXO not found on blockchain"}

            taker_utxo_value = taker_utxo.value
            taker_utxo_confirmations = taker_utxo.confirmations

        if taker_utxo_confirmations < self.taker_utxo_age:
            return False, {
                "error": f"Taker's UTXO too young: "
                f"{taker_utxo_confirmations} < {self.taker_utxo_age}"
            }

        required_amount = int(self.amount * self.taker_utxo_amtpercent / 100)
        if taker_utxo_value < required_amount:
            return False, {
                "error": f"Taker's UTXO too small: {taker_utxo_value} < {required_amount}"
            }

        logger.info("Taker's UTXO validated ✓")

        utxos_dict, cj_addr, change_addr, mixdepth = await self._select_our_utxos()

        if not utxos_dict:
            return False, {"error": "Failed to select UTXOs"}

        self.our_utxos = utxos_dict
        self.cj_address = cj_addr
        self.change_address = change_addr
        self.mixdepth = mixdepth

        # Format UTXOs: extended format (neutrino_compat) includes scriptpubkey:blockheight
        # Legacy format is just txid:vout
        utxo_metadata_list = [
            UTXOMetadata(
                txid=txid,
                vout=vout,
                scriptpubkey=utxo_info.scriptpubkey,
                blockheight=utxo_info.height,
            )
            for (txid, vout), utxo_info in utxos_dict.items()
        ]

        # Use extended format if peer supports neutrino_compat
        utxo_list_str = format_utxo_list(utxo_metadata_list, extended=self.peer_neutrino_compat)
        if self.peer_neutrino_compat:
            logger.debug("Using extended UTXO format for neutrino_compat peer")
        else:
            logger.debug("Using legacy UTXO format for legacy peer")

        # Get EC key for our first UTXO to sign taker's encryption key
        # This proves we own the UTXO we're contributing
        first_utxo_key, first_utxo_info = next(iter(utxos_dict.items()))
        auth_address = first_utxo_info.address
        auth_hd_key = self.wallet.get_key_for_address(auth_address)

        if auth_hd_key is None:
            return False, {"error": f"Could not get key for address {auth_address}"}

        # Get our EC pubkey (compressed)
        auth_pub_bytes = auth_hd_key.get_public_key_bytes()

        # Sign OUR OWN NaCl pubkey (hex string) with our EC key
        # This proves to the taker that we own the UTXO and links it to our encryption identity
        from jmcore.crypto import ecdsa_sign

        our_nacl_pk_hex = self.crypto.get_pubkey_hex()
        btc_sig = ecdsa_sign(our_nacl_pk_hex, auth_hd_key.get_private_key_bytes())

        response = {
            "utxo_list": utxo_list_str,
            "auth_pub": auth_pub_bytes.hex(),
            "cj_addr": cj_addr,
            "change_addr": change_addr,
            "btc_sig": btc_sig,
        }

        self.state = CoinJoinState.IOAUTH_SENT
        logger.info(f"Sent !ioauth with {len(utxos_dict)} UTXOs")

        return True, response

    except Exception as e:
        logger.error(f"Failed to handle !auth: {e}")
        self.state = CoinJoinState.FAILED
        return False, {"error": str(e)}

Handle !auth message from taker.

CRITICAL SECURITY: Verifies PoDLE proof and taker's UTXO.

Args

commitment
PoDLE commitment (should match from !fill)
revelation
PoDLE revelation data
kphex
Encryption key (hex)

Returns

(success, response_data with UTXOs or error)

async def handle_fill(self, amount: int, commitment: str, taker_pk: str) ‑> tuple[bool, dict[str, typing.Any]]
Expand source code
async def handle_fill(
    self, amount: int, commitment: str, taker_pk: str
) -> tuple[bool, dict[str, Any]]:
    """
    Handle !fill message from taker.

    Args:
        amount: CoinJoin amount requested
        commitment: PoDLE commitment (will be verified later in !auth)
        taker_pk: Taker's NaCl public key for E2E encryption

    Returns:
        (success, response_data)
    """
    try:
        if self.is_timed_out():
            self.state = CoinJoinState.FAILED
            return False, {"error": f"Session timed out after {self.session_timeout_sec}s"}

        if self.state != CoinJoinState.IDLE:
            return False, {"error": "Session not in IDLE state"}

        if amount < self.offer.minsize:
            return False, {"error": f"Amount too small: {amount} < {self.offer.minsize}"}

        if amount > self.offer.maxsize:
            return False, {"error": f"Amount too large: {amount} > {self.offer.maxsize}"}

        self.amount = amount
        self.commitment = bytes.fromhex(commitment)
        self.taker_nacl_pk = taker_pk  # Store for btc_sig in handle_auth
        self.state = CoinJoinState.FILL_RECEIVED

        logger.info(
            f"Received !fill from {self.taker_nick}: "
            f"amount={amount}, commitment={commitment[:16]}..., taker_pk={taker_pk[:16]}..."
        )

        # Set up E2E encryption with taker's NaCl pubkey
        try:
            self.crypto.setup_encryption(taker_pk)
            logger.debug(f"Set up encryption box with taker {self.taker_nick}")
        except Exception as e:
            logger.error(f"Failed to set up encryption with taker: {e}")
            return False, {"error": f"Invalid taker pubkey: {e}"}

        # Return our NaCl pubkey and features for E2E encryption setup
        # Format for !pubkey: <nacl_pubkey_hex> [features=<comma-separated>]
        # Features are optional - legacy peers won't send them
        nacl_pubkey = self.crypto.get_pubkey_hex()

        self.state = CoinJoinState.PUBKEY_SENT

        # Include features in the response
        # neutrino_compat: We support extended UTXO format (txid:vout:scriptpubkey:blockheight)
        # All modern makers can accept extended format (extra fields are simply ignored)
        features: list[str] = ["neutrino_compat"]

        return True, {"nacl_pubkey": nacl_pubkey, "features": features}

    except Exception as e:
        logger.error(f"Failed to handle !fill: {e}")
        self.state = CoinJoinState.FAILED
        return False, {"error": str(e)}

Handle !fill message from taker.

Args

amount
CoinJoin amount requested
commitment
PoDLE commitment (will be verified later in !auth)
taker_pk
Taker's NaCl public key for E2E encryption

Returns

(success, response_data)

async def handle_tx(self, tx_hex: str) ‑> tuple[bool, dict[str, typing.Any]]
Expand source code
async def handle_tx(self, tx_hex: str) -> tuple[bool, dict[str, Any]]:
    """
    Handle !tx message from taker.

    CRITICAL SECURITY: Verifies unsigned transaction before signing!

    Args:
        tx_hex: Unsigned transaction hex

    Returns:
        (success, response_data with signatures or error)
    """
    try:
        if self.is_timed_out():
            self.state = CoinJoinState.FAILED
            return False, {"error": f"Session timed out after {self.session_timeout_sec}s"}

        if self.state != CoinJoinState.IOAUTH_SENT:
            return False, {"error": "Session not in correct state for !tx"}

        logger.info(f"Received !tx from {self.taker_nick}, verifying...")

        # Convert network string to NetworkType enum
        network = NetworkType(self.wallet.network)

        is_valid, error = verify_unsigned_transaction(
            tx_hex=tx_hex,
            our_utxos=self.our_utxos,
            cj_address=self.cj_address,
            change_address=self.change_address,
            amount=self.amount,
            cjfee=self.offer.cjfee,
            txfee=self.offer.txfee,
            offer_type=self.offer.ordertype,
            network=network,
        )

        if not is_valid:
            logger.error(f"Transaction verification FAILED: {error}")
            self.state = CoinJoinState.FAILED
            return False, {"error": f"Transaction verification failed: {error}"}

        logger.info("Transaction verification PASSED ✓")

        signatures = await self._sign_transaction(tx_hex)  # type: ignore[arg-type]

        if not signatures:
            return False, {"error": "Failed to sign transaction"}

        # Compute txid from the unsigned transaction for history tracking
        # The txid is computed from the non-witness data so we can calculate it now
        from jmcore.bitcoin import get_txid

        txid = get_txid(tx_hex)

        response = {"signatures": signatures, "txid": txid}

        self.state = CoinJoinState.SIG_SENT
        logger.info(f"Sent !sig with {len(signatures)} signatures (txid: {txid[:16]}...)")

        return True, response

    except Exception as e:
        logger.error(f"Failed to handle !tx: {e}")
        self.state = CoinJoinState.FAILED
        return False, {"error": str(e)}

Handle !tx message from taker.

CRITICAL SECURITY: Verifies unsigned transaction before signing!

Args

tx_hex
Unsigned transaction hex

Returns

(success, response_data with signatures or error)

def is_timed_out(self) ‑> bool
Expand source code
def is_timed_out(self) -> bool:
    """Check if the session has exceeded the timeout."""
    return time.time() - self.created_at > self.session_timeout_sec

Check if the session has exceeded the timeout.

class CoinJoinState (*values)
Expand source code
class CoinJoinState(str, Enum):
    """CoinJoin session states"""

    IDLE = "idle"
    FILL_RECEIVED = "fill_received"
    PUBKEY_SENT = "pubkey_sent"
    AUTH_RECEIVED = "auth_received"
    IOAUTH_SENT = "ioauth_sent"
    TX_RECEIVED = "tx_received"
    SIG_SENT = "sig_sent"
    COMPLETE = "complete"
    FAILED = "failed"

CoinJoin session states

Ancestors

  • builtins.str
  • enum.Enum

Class variables

var AUTH_RECEIVED

The type of the None singleton.

var COMPLETE

The type of the None singleton.

var FAILED

The type of the None singleton.

var FILL_RECEIVED

The type of the None singleton.

var IDLE

The type of the None singleton.

var IOAUTH_SENT

The type of the None singleton.

var PUBKEY_SENT

The type of the None singleton.

var SIG_SENT

The type of the None singleton.

var TX_RECEIVED

The type of the None singleton.