Module jmcore.podle

Proof of Discrete Log Equivalence (PoDLE) for JoinMarket.

PoDLE is used to prevent sybil attacks in JoinMarket by requiring takers to prove ownership of a UTXO without revealing which UTXO until after the maker commits to participate.

This module provides both generation (for takers) and verification (for makers) of PoDLE proofs.

Protocol flow: 1. Taker generates commitment C = H(P2) where P2 = kJ (k = private key, J = NUMS point) 2. Taker sends commitment C to maker 3. Maker accepts and sends pubkey 4. Taker reveals P, P2, sig, e as the "revelation" 5. Maker verifies: P = kG and P2 = k*J (same k)

Reference: https://gist.github.com/AdamISZ/9cbba5e9408d23813ca8 Reference: joinmarket-clientserver/src/jmclient/podle.py

Functions

def deserialize_revelation(revelation_str: str) ‑> dict[str, typing.Any] | None
Expand source code
def deserialize_revelation(revelation_str: str) -> dict[str, Any] | None:
    """
    Deserialize PoDLE revelation from wire format.

    Format: P|P2|sig|e|utxo (pipe-separated hex strings)
    """
    try:
        parts = revelation_str.split("|")
        if len(parts) != 5:
            logger.warning(f"Invalid revelation format: expected 5 parts, got {len(parts)}")
            return None

        return {
            "P": parts[0],
            "P2": parts[1],
            "sig": parts[2],
            "e": parts[3],
            "utxo": parts[4],
        }

    except Exception as e:
        logger.error(f"Failed to deserialize PoDLE revelation: {e}")
        return None

Deserialize PoDLE revelation from wire format.

Format: P|P2|sig|e|utxo (pipe-separated hex strings)

def generate_podle(private_key_bytes: bytes, utxo_str: str, index: int = 0) ‑> PoDLECommitment
Expand source code
def generate_podle(
    private_key_bytes: bytes,
    utxo_str: str,
    index: int = 0,
) -> PoDLECommitment:
    """
    Generate a PoDLE commitment for a UTXO.

    The PoDLE proves that the taker owns the UTXO without revealing
    the private key. It creates a zero-knowledge proof that:
    P = k*G and P2 = k*J have the same discrete log k.

    Args:
        private_key_bytes: 32-byte private key
        utxo_str: UTXO reference as "txid:vout"
        index: NUMS point index (0-9)

    Returns:
        PoDLECommitment with all proof data
    """
    if len(private_key_bytes) != 32:
        raise PoDLEError(f"Invalid private key length: {len(private_key_bytes)}")

    if index not in PRECOMPUTED_NUMS:
        raise PoDLEError(f"Invalid NUMS index: {index}")

    # Get private key as integer
    k = int.from_bytes(private_key_bytes, "big")
    if k == 0 or k >= SECP256K1_N:
        raise PoDLEError("Invalid private key value")

    # Calculate P = k*G (standard public key)
    p_point = scalar_mult_g(k)
    p_bytes = point_to_bytes(p_point)

    # Get NUMS point J
    j_point = get_nums_point(index)

    # Calculate P2 = k*J
    p2_point = point_mult(k, j_point)
    p2_bytes = point_to_bytes(p2_point)

    # Generate commitment C = H(P2)
    commitment = hashlib.sha256(p2_bytes).digest()

    # Generate Schnorr-like proof
    # Choose random nonce k_proof
    k_proof = int.from_bytes(secrets.token_bytes(32), "big") % SECP256K1_N
    if k_proof == 0:
        k_proof = 1

    # Kg = k_proof * G
    kg_point = scalar_mult_g(k_proof)
    kg_bytes = point_to_bytes(kg_point)

    # Kj = k_proof * J
    kj_point = point_mult(k_proof, j_point)
    kj_bytes = point_to_bytes(kj_point)

    # Challenge e = H(Kg || Kj || P || P2)
    e_bytes = hashlib.sha256(kg_bytes + kj_bytes + p_bytes + p2_bytes).digest()
    e = int.from_bytes(e_bytes, "big") % SECP256K1_N

    # Response s = k_proof + e * k (mod n) - JAM compatible
    s = (k_proof + e * k) % SECP256K1_N
    s_bytes = s.to_bytes(32, "big")

    logger.debug(
        f"Generated PoDLE for {utxo_str} using NUMS index {index}, "
        f"commitment={commitment.hex()[:16]}..."
    )

    return PoDLECommitment(
        commitment=commitment,
        p=p_bytes,
        p2=p2_bytes,
        sig=s_bytes,
        e=e_bytes,
        utxo=utxo_str,
        index=index,
    )

Generate a PoDLE commitment for a UTXO.

The PoDLE proves that the taker owns the UTXO without revealing the private key. It creates a zero-knowledge proof that: P = kG and P2 = kJ have the same discrete log k.

Args

private_key_bytes
32-byte private key
utxo_str
UTXO reference as "txid:vout"
index
NUMS point index (0-9)

Returns

PoDLECommitment with all proof data

def get_nums_point(index: int) ‑> coincurve.keys.PublicKey
Expand source code
def get_nums_point(index: int) -> PublicKey:
    """Get Nothing-Up-My-Sleeve (NUMS) generator point J for given index."""
    if index not in PRECOMPUTED_NUMS:
        raise PoDLEError(f"NUMS point index {index} not supported (max 9)")

    point_bytes = PRECOMPUTED_NUMS[index]
    return PublicKey(point_bytes)

Get Nothing-Up-My-Sleeve (NUMS) generator point J for given index.

def parse_podle_revelation(revelation: dict[str, Any]) ‑> dict[str, typing.Any] | None
Expand source code
def parse_podle_revelation(revelation: dict[str, Any]) -> dict[str, Any] | None:
    """
    Parse and validate PoDLE revelation structure.

    Expected format from taker:
    {
        'P': <hex string>,
        'P2': <hex string>,
        'sig': <hex string>,
        'e': <hex string>,
        'utxo': <txid:vout or txid:vout:scriptpubkey:blockheight string>
    }

    Returns parsed structure with bytes, or None if invalid.
    Extended format includes scriptpubkey and blockheight for neutrino_compat feature.
    """
    try:
        required_fields = ["P", "P2", "sig", "e", "utxo"]
        for field in required_fields:
            if field not in revelation:
                logger.warning(f"Missing required field in PoDLE revelation: {field}")
                return None

        p_bytes = bytes.fromhex(revelation["P"])
        p2_bytes = bytes.fromhex(revelation["P2"])
        sig_bytes = bytes.fromhex(revelation["sig"])
        e_bytes = bytes.fromhex(revelation["e"])

        utxo_parts = revelation["utxo"].split(":")

        # Legacy format: txid:vout (2 parts)
        # Extended format: txid:vout:scriptpubkey:blockheight (4 parts)
        if len(utxo_parts) == 2:
            txid = utxo_parts[0]
            vout = int(utxo_parts[1])
            scriptpubkey = None
            blockheight = None
        elif len(utxo_parts) == 4:
            txid = utxo_parts[0]
            vout = int(utxo_parts[1])
            scriptpubkey = utxo_parts[2]
            blockheight = int(utxo_parts[3])
            logger.debug(f"Parsed extended UTXO format: {txid}:{vout} with metadata")
        else:
            logger.warning(f"Invalid UTXO format: {revelation['utxo']}")
            return None

        result: dict[str, Any] = {
            "P": p_bytes,
            "P2": p2_bytes,
            "sig": sig_bytes,
            "e": e_bytes,
            "txid": txid,
            "vout": vout,
        }

        # Add extended metadata if present
        if scriptpubkey is not None:
            result["scriptpubkey"] = scriptpubkey
        if blockheight is not None:
            result["blockheight"] = blockheight

        return result

    except Exception as e:
        logger.error(f"Failed to parse PoDLE revelation: {e}")
        return None

Parse and validate PoDLE revelation structure.

Expected format from taker: { 'P': , 'P2': , 'sig': , 'e': , 'utxo': }

Returns parsed structure with bytes, or None if invalid. Extended format includes scriptpubkey and blockheight for neutrino_compat feature.

def point_add(p1: PublicKey, p2: PublicKey) ‑> coincurve.keys.PublicKey
Expand source code
def point_add(p1: PublicKey, p2: PublicKey) -> PublicKey:
    """Add two EC points using coincurve."""
    return p1.combine([p2])

Add two EC points using coincurve.

def point_mult(scalar: int, point: PublicKey) ‑> coincurve.keys.PublicKey
Expand source code
def point_mult(scalar: int, point: PublicKey) -> PublicKey:
    """Multiply EC point by scalar using coincurve."""
    scalar = scalar % SECP256K1_N
    if scalar == 0:
        raise PoDLEError("Scalar cannot be zero")
    scalar_bytes = scalar.to_bytes(32, "big")
    return point.multiply(scalar_bytes)

Multiply EC point by scalar using coincurve.

def point_to_bytes(point: PublicKey) ‑> bytes
Expand source code
def point_to_bytes(point: PublicKey) -> bytes:
    """Convert EC point to compressed bytes."""
    return point.format(compressed=True)

Convert EC point to compressed bytes.

def scalar_mult_g(scalar: int) ‑> coincurve.keys.PublicKey
Expand source code
def scalar_mult_g(scalar: int) -> PublicKey:
    """Multiply generator G by scalar (creates public key from private key)."""
    scalar = scalar % SECP256K1_N
    if scalar == 0:
        raise PoDLEError("Scalar cannot be zero")
    scalar_bytes = scalar.to_bytes(32, "big")
    return PublicKey.from_secret(scalar_bytes)

Multiply generator G by scalar (creates public key from private key).

def serialize_revelation(commitment: PoDLECommitment) ‑> str
Expand source code
def serialize_revelation(commitment: PoDLECommitment) -> str:
    """
    Serialize PoDLE revelation to wire format.

    Format: P|P2|sig|e|utxo (pipe-separated hex strings)
    """
    return "|".join(
        [
            commitment.p.hex(),
            commitment.p2.hex(),
            commitment.sig.hex(),
            commitment.e.hex(),
            commitment.utxo,
        ]
    )

Serialize PoDLE revelation to wire format.

Format: P|P2|sig|e|utxo (pipe-separated hex strings)

def verify_podle(p: bytes,
p2: bytes,
sig: bytes,
e: bytes,
commitment: bytes,
index_range: range = range(0, 10)) ‑> tuple[bool, str]
Expand source code
def verify_podle(
    p: bytes,
    p2: bytes,
    sig: bytes,
    e: bytes,
    commitment: bytes,
    index_range: range = range(10),
) -> tuple[bool, str]:
    """
    Verify PoDLE proof.

    Verifies that P and P2 have the same discrete log (private key)
    without revealing the private key itself.

    Args:
        p: Public key bytes (33 bytes compressed)
        p2: Commitment public key bytes (33 bytes compressed)
        sig: Signature s value (32 bytes)
        e: Challenge e value (32 bytes)
        commitment: sha256(P2) commitment (32 bytes)
        index_range: Allowed NUMS indices to try

    Returns:
        (is_valid, error_message)
    """
    try:
        if len(p) != 33:
            return False, f"Invalid P length: {len(p)}, expected 33"
        if len(p2) != 33:
            return False, f"Invalid P2 length: {len(p2)}, expected 33"
        if len(sig) != 32:
            return False, f"Invalid sig length: {len(sig)}, expected 32"
        if len(e) != 32:
            return False, f"Invalid e length: {len(e)}, expected 32"
        if len(commitment) != 32:
            return False, f"Invalid commitment length: {len(commitment)}, expected 32"

        expected_commitment = hashlib.sha256(p2).digest()
        if commitment != expected_commitment:
            return False, "Commitment does not match H(P2)"

        p_point = PublicKey(p)
        p2_point = PublicKey(p2)

        s_int = int.from_bytes(sig, "big")
        e_int = int.from_bytes(e, "big")

        if s_int >= SECP256K1_N or e_int >= SECP256K1_N:
            return False, "Signature values out of range"

        # sg = s * G
        sg = scalar_mult_g(s_int) if s_int > 0 else None

        # Compute -e mod N for subtraction (JAM compatible: s = k + e*x, verify Kg = s*G - e*P)
        minus_e_int = (-e_int) % SECP256K1_N

        for index in index_range:
            try:
                j = get_nums_point(index)

                # Kg = s*G - e*P = s*G + (-e)*P (JAM compatible verification)
                minus_e_p = point_mult(minus_e_int, p_point)
                kg = point_add(sg, minus_e_p) if sg is not None else minus_e_p

                # Kj = s*J - e*P2 = s*J + (-e)*P2
                minus_e_p2 = point_mult(minus_e_int, p2_point)
                if s_int > 0:
                    sj = point_mult(s_int, j)
                    kj = point_add(sj, minus_e_p2)
                else:
                    kj = minus_e_p2

                kg_bytes = point_to_bytes(kg)
                kj_bytes = point_to_bytes(kj)

                e_check = hashlib.sha256(kg_bytes + kj_bytes + p + p2).digest()

                if e_check == e:
                    logger.debug(f"PoDLE verification successful at index {index}")
                    return True, ""

            except Exception as ex:
                logger.debug(f"PoDLE verification failed at index {index}: {ex}")
                continue

        return False, f"PoDLE verification failed for all indices in {index_range}"

    except Exception as ex:
        logger.error(f"PoDLE verification error: {ex}")
        return False, f"Verification error: {ex}"

Verify PoDLE proof.

Verifies that P and P2 have the same discrete log (private key) without revealing the private key itself.

Args

p
Public key bytes (33 bytes compressed)
p2
Commitment public key bytes (33 bytes compressed)
sig
Signature s value (32 bytes)
e
Challenge e value (32 bytes)
commitment
sha256(P2) commitment (32 bytes)
index_range
Allowed NUMS indices to try

Returns

(is_valid, error_message)

Classes

class PoDLECommitment (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class PoDLECommitment:
    """PoDLE commitment data generated by taker."""

    commitment: bytes  # H(P2) - 32 bytes
    p: bytes  # Public key P = k*G - 33 bytes compressed
    p2: bytes  # Commitment point P2 = k*J - 33 bytes compressed
    sig: bytes  # Schnorr signature s - 32 bytes
    e: bytes  # Challenge e - 32 bytes
    utxo: str  # UTXO reference "txid:vout"
    index: int  # NUMS point index used

    def to_revelation(self) -> dict[str, str]:
        """Convert to revelation format for sending to maker."""
        return {
            "P": self.p.hex(),
            "P2": self.p2.hex(),
            "sig": self.sig.hex(),
            "e": self.e.hex(),
            "utxo": self.utxo,
        }

    def to_commitment_str(self) -> str:
        """
        Get commitment as string with type prefix.

        JoinMarket requires a commitment type prefix to allow future
        commitment schemes. "P" indicates a standard PoDLE commitment.
        Format: "P" + hex(commitment)
        """
        return "P" + self.commitment.hex()

PoDLE commitment data generated by taker.

Instance variables

var commitment : bytes

The type of the None singleton.

var e : bytes

The type of the None singleton.

var index : int

The type of the None singleton.

var p : bytes

The type of the None singleton.

var p2 : bytes

The type of the None singleton.

var sig : bytes

The type of the None singleton.

var utxo : str

The type of the None singleton.

Methods

def to_commitment_str(self) ‑> str
Expand source code
def to_commitment_str(self) -> str:
    """
    Get commitment as string with type prefix.

    JoinMarket requires a commitment type prefix to allow future
    commitment schemes. "P" indicates a standard PoDLE commitment.
    Format: "P" + hex(commitment)
    """
    return "P" + self.commitment.hex()

Get commitment as string with type prefix.

JoinMarket requires a commitment type prefix to allow future commitment schemes. "P" indicates a standard PoDLE commitment. Format: "P" + hex(commitment)

def to_revelation(self) ‑> dict[str, str]
Expand source code
def to_revelation(self) -> dict[str, str]:
    """Convert to revelation format for sending to maker."""
    return {
        "P": self.p.hex(),
        "P2": self.p2.hex(),
        "sig": self.sig.hex(),
        "e": self.e.hex(),
        "utxo": self.utxo,
    }

Convert to revelation format for sending to maker.

class PoDLEError (*args, **kwargs)
Expand source code
class PoDLEError(Exception):
    """PoDLE generation or verification error."""

    pass

PoDLE generation or verification error.

Ancestors

  • builtins.Exception
  • builtins.BaseException