Module jmcore.protocol

JoinMarket protocol definitions, message types, and serialization.

Feature Flag System

This implementation uses feature flags for capability negotiation instead of protocol version bumping. This allows incremental feature adoption while maintaining full compatibility with the reference implementation from joinmarket-clientserver.

Features are advertised in the handshake features dict and negotiated per-CoinJoin session via extended !fill/!pubkey messages.

Available Features: - neutrino_compat: Extended UTXO metadata (scriptpubkey, blockheight) for light client verification. Required for Neutrino backend takers. - push_encrypted: Encrypted !push command with session binding. Prevents abuse of makers as unauthenticated broadcast bots.

Feature Dependencies: - neutrino_compat: No dependencies - push_encrypted: Requires active NaCl encryption session (implicit)

Nick Format:

JoinMarket nicks encode the protocol version: J{version}{hash} All nicks use version 5 for maximum compatibility with reference implementation. Feature detection happens via handshake and !fill/!pubkey exchange, not nick.

Cross-Implementation Compatibility:

Our Implementation ↔ Reference (JAM): - We use J5 nicks and proto-ver=5 in handshake - Features field is ignored by reference implementation - Legacy UTXO format used unless both peers advertise neutrino_compat - Graceful fallback to v5 behavior for all features

Feature Negotiation During CoinJoin: - Taker advertises features in !fill (optional JSON suffix) - Maker responds with features in !pubkey (optional JSON suffix) - Extended formats used only when both peers support the feature

Peerlist Feature Extension: Our directory server extends the peerlist format to include features: - Legacy format: nick;location (or nick;location;D for disconnected) - Extended format: nick;location;F:feature1+feature2 (features as plus-separated list) The extended format is backward compatible - legacy clients will ignore the F: suffix. Note: Plus separator is used because the peerlist itself uses commas to separate entries.

Functions

def create_handshake_request(nick: str,
location: str,
network: str,
directory: bool = False,
neutrino_compat: bool = False,
features: FeatureSet | None = None) ‑> dict[str, typing.Any]
Expand source code
def create_handshake_request(
    nick: str,
    location: str,
    network: str,
    directory: bool = False,
    neutrino_compat: bool = False,
    features: FeatureSet | None = None,
) -> dict[str, Any]:
    """
    Create a handshake request message.

    Args:
        nick: Bot nickname
        location: Onion address or NOT-SERVING-ONION
        network: Bitcoin network (mainnet, testnet, signet, regtest)
        directory: True if this is a directory server
        neutrino_compat: True to advertise Neutrino-compatible UTXO metadata support
        features: FeatureSet to advertise (overrides neutrino_compat if provided)

    Returns:
        Handshake request payload dict
    """
    if features is not None:
        features_dict = features.to_dict()
    else:
        features_dict = {}
        if neutrino_compat:
            features_dict[FEATURE_NEUTRINO_COMPAT] = True

    return {
        "app-name": "joinmarket",
        "directory": directory,
        "location-string": location,
        "proto-ver": JM_VERSION,
        "features": features_dict,
        "nick": nick,
        "network": network,
    }

Create a handshake request message.

Args

nick
Bot nickname
location
Onion address or NOT-SERVING-ONION
network
Bitcoin network (mainnet, testnet, signet, regtest)
directory
True if this is a directory server
neutrino_compat
True to advertise Neutrino-compatible UTXO metadata support
features
FeatureSet to advertise (overrides neutrino_compat if provided)

Returns

Handshake request payload dict

def create_handshake_response(nick: str,
network: str,
accepted: bool = True,
motd: str = 'JoinMarket Directory Server',
neutrino_compat: bool = False,
features: FeatureSet | None = None) ‑> dict[str, typing.Any]
Expand source code
def create_handshake_response(
    nick: str,
    network: str,
    accepted: bool = True,
    motd: str = "JoinMarket Directory Server",
    neutrino_compat: bool = False,
    features: FeatureSet | None = None,
) -> dict[str, Any]:
    """
    Create a handshake response message.

    Args:
        nick: Directory server nickname
        network: Bitcoin network
        accepted: Whether the connection is accepted
        motd: Message of the day
        neutrino_compat: True to advertise Neutrino-compatible UTXO metadata support
        features: FeatureSet to advertise (overrides neutrino_compat if provided)

    Returns:
        Handshake response payload dict
    """
    if features is not None:
        features_dict = features.to_dict()
    else:
        features_dict = {}
        if neutrino_compat:
            features_dict[FEATURE_NEUTRINO_COMPAT] = True

    return {
        "app-name": "joinmarket",
        "directory": True,
        "proto-ver-min": JM_VERSION,
        "proto-ver-max": JM_VERSION,
        "features": features_dict,
        "accepted": accepted,
        "nick": nick,
        "network": network,
        "motd": motd,
    }

Create a handshake response message.

Args

nick
Directory server nickname
network
Bitcoin network
accepted
Whether the connection is accepted
motd
Message of the day
neutrino_compat
True to advertise Neutrino-compatible UTXO metadata support
features
FeatureSet to advertise (overrides neutrino_compat if provided)

Returns

Handshake response payload dict

def create_peerlist_entry(nick: str,
location: str,
disconnected: bool = False,
features: FeatureSet | None = None) ‑> str
Expand source code
def create_peerlist_entry(
    nick: str,
    location: str,
    disconnected: bool = False,
    features: FeatureSet | None = None,
) -> str:
    """
    Create a peerlist entry string.

    Format:
    - Legacy: nick;location or nick;location;D
    - Extended: nick;location;F:feature1,feature2 or nick;location;D;F:feature1,feature2

    The F: prefix is used to identify the features field and maintain backward compatibility.
    """
    entry = f"{nick}{NICK_PEERLOCATOR_SEPARATOR}{location}"
    if disconnected:
        entry += f"{NICK_PEERLOCATOR_SEPARATOR}D"
    if features and features.features:
        entry += f"{NICK_PEERLOCATOR_SEPARATOR}F:{features.to_comma_string()}"
    return entry

Create a peerlist entry string.

Format: - Legacy: nick;location or nick;location;D - Extended: nick;location;F:feature1,feature2 or nick;location;D;F:feature1,feature2

The F: prefix is used to identify the features field and maintain backward compatibility.

def format_jm_message(from_nick: str, to_nick: str, cmd: str, message: str) ‑> str
Expand source code
def format_jm_message(from_nick: str, to_nick: str, cmd: str, message: str) -> str:
    return f"{from_nick}{COMMAND_PREFIX}{to_nick}{COMMAND_PREFIX}{cmd} {message}"
def format_utxo_list(utxos: list[UTXOMetadata],
extended: bool = False) ‑> str
Expand source code
def format_utxo_list(utxos: list[UTXOMetadata], extended: bool = False) -> str:
    """
    Format a list of UTXOs as comma-separated string.

    Args:
        utxos: List of UTXOMetadata objects
        extended: If True, use extended format with scriptpubkey:blockheight

    Returns:
        Comma-separated UTXO string
    """
    if extended:
        return ",".join(u.to_extended_str() for u in utxos)
    else:
        return ",".join(u.to_legacy_str() for u in utxos)

Format a list of UTXOs as comma-separated string.

Args

utxos
List of UTXOMetadata objects
extended
If True, use extended format with scriptpubkey:blockheight

Returns

Comma-separated UTXO string

def get_nick_version(nick: str) ‑> int
Expand source code
def get_nick_version(nick: str) -> int:
    """
    Extract protocol version from a JoinMarket nick.

    Nick format: J{version}{hash} where version is a single digit.
    Example: J5abc123... (v5)

    Returns JM_VERSION (5) if version cannot be determined.
    """
    if nick and len(nick) >= 2 and nick[0] == "J" and nick[1].isdigit():
        return int(nick[1])
    return JM_VERSION

Extract protocol version from a JoinMarket nick.

Nick format: J{version}{hash} where version is a single digit. Example: J5abc123… (v5)

Returns JM_VERSION (5) if version cannot be determined.

def parse_jm_message(msg: str) ‑> tuple[str, str, str] | None
Expand source code
def parse_jm_message(msg: str) -> tuple[str, str, str] | None:
    try:
        parts = msg.split(COMMAND_PREFIX)
        if len(parts) < 3:
            return None
        from_nick = parts[0]
        to_nick = parts[1]
        rest = COMMAND_PREFIX.join(parts[2:])
        return (from_nick, to_nick, rest)
    except Exception:
        return None
def parse_peer_location(location: str) ‑> tuple[str, int]
Expand source code
def parse_peer_location(location: str) -> tuple[str, int]:
    if location == NOT_SERVING_ONION_HOSTNAME:
        return (location, -1)
    try:
        host, port_str = location.split(":")
        port = int(port_str)
        if port <= 0 or port > 65535:
            raise ValueError(f"Invalid port: {port}")
        return (host, port)
    except (ValueError, AttributeError) as e:
        raise ValueError(f"Invalid location string: {location}") from e
def parse_peerlist_entry(entry: str) ‑> tuple[str, str, bool, FeatureSet]
Expand source code
def parse_peerlist_entry(entry: str) -> tuple[str, str, bool, FeatureSet]:
    """
    Parse a peerlist entry string.

    Returns:
        Tuple of (nick, location, disconnected, features)
    """
    parts = entry.split(NICK_PEERLOCATOR_SEPARATOR)
    if len(parts) < 2:
        raise ValueError(f"Invalid peerlist entry: {entry}")

    nick = parts[0]
    location = parts[1]
    disconnected = False
    features = FeatureSet()

    # Parse remaining parts
    for part in parts[2:]:
        if part == "D":
            disconnected = True
        elif part.startswith("F:"):
            features = FeatureSet.from_comma_string(part[2:])

    return (nick, location, disconnected, features)

Parse a peerlist entry string.

Returns

Tuple of (nick, location, disconnected, features)

def parse_utxo_list(utxo_list_str: str, require_metadata: bool = False) ‑> list[UTXOMetadata]
Expand source code
def parse_utxo_list(utxo_list_str: str, require_metadata: bool = False) -> list[UTXOMetadata]:
    """
    Parse a comma-separated list of UTXOs.

    Args:
        utxo_list_str: Comma-separated UTXOs (legacy or extended format)
        require_metadata: If True, raise error if any UTXO lacks Neutrino metadata

    Returns:
        List of UTXOMetadata objects
    """
    if not utxo_list_str:
        return []

    utxos = []
    for utxo_str in utxo_list_str.split(","):
        utxo = UTXOMetadata.from_str(utxo_str.strip())
        if require_metadata and not utxo.has_neutrino_metadata():
            raise ValueError(f"UTXO {utxo.to_legacy_str()} missing Neutrino metadata")
        utxos.append(utxo)
    return utxos

Parse a comma-separated list of UTXOs.

Args

utxo_list_str
Comma-separated UTXOs (legacy or extended format)
require_metadata
If True, raise error if any UTXO lacks Neutrino metadata

Returns

List of UTXOMetadata objects

def peer_supports_neutrino_compat(handshake_data: dict[str, Any]) ‑> bool
Expand source code
def peer_supports_neutrino_compat(handshake_data: dict[str, Any]) -> bool:
    """
    Check if a peer supports Neutrino-compatible UTXO metadata.

    Args:
        handshake_data: Handshake payload from peer

    Returns:
        True if peer advertises neutrino_compat feature
    """
    features = handshake_data.get("features", {})
    return features.get(FEATURE_NEUTRINO_COMPAT, False)

Check if a peer supports Neutrino-compatible UTXO metadata.

Args

handshake_data
Handshake payload from peer

Returns

True if peer advertises neutrino_compat feature

Classes

class FeatureSet (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class FeatureSet:
    """
    Represents a set of protocol features advertised by a peer.

    Used for feature negotiation during handshake and CoinJoin sessions.
    """

    features: set[str] = Field(default_factory=set)

    @classmethod
    def from_handshake(cls, handshake_data: dict[str, Any]) -> FeatureSet:
        """Extract features from a handshake payload."""
        features_dict = handshake_data.get("features", {})
        # Only include features that are set to True
        features = {k for k, v in features_dict.items() if v is True}
        return cls(features=features)

    @classmethod
    def from_list(cls, feature_list: list[str]) -> FeatureSet:
        """Create from a list of feature names."""
        return cls(features=set(feature_list))

    @classmethod
    def from_comma_string(cls, s: str) -> FeatureSet:
        """Parse from plus-separated string (e.g., 'neutrino_compat+push_encrypted').

        Note: Despite the method name, uses '+' as separator because the peerlist
        itself uses ',' to separate entries. The name is kept for backward compatibility.
        Also accepts ',' for legacy/handshake use cases.
        """
        if not s or not s.strip():
            return cls(features=set())
        # Support both + (peerlist) and , (legacy/handshake) separators
        if "+" in s:
            return cls(features={f.strip() for f in s.split("+") if f.strip()})
        return cls(features={f.strip() for f in s.split(",") if f.strip()})

    def to_dict(self) -> dict[str, bool]:
        """Convert to dict for JSON serialization."""
        return dict.fromkeys(sorted(self.features), True)

    def to_comma_string(self) -> str:
        """Convert to plus-separated string for peerlist F: suffix.

        Note: Uses '+' as separator instead of ',' because the peerlist
        itself uses ',' to separate entries. Using ',' for features would
        cause parsing ambiguity.
        """
        return "+".join(sorted(self.features))

    def supports(self, feature: str) -> bool:
        """Check if this set includes a specific feature."""
        return feature in self.features

    def supports_neutrino_compat(self) -> bool:
        """Check if neutrino_compat is supported."""
        return FEATURE_NEUTRINO_COMPAT in self.features

    def supports_push_encrypted(self) -> bool:
        """Check if push_encrypted is supported."""
        return FEATURE_PUSH_ENCRYPTED in self.features

    def supports_peerlist_features(self) -> bool:
        """Check if peer supports extended peerlist with features (F: suffix)."""
        return FEATURE_PEERLIST_FEATURES in self.features

    def validate_dependencies(self) -> tuple[bool, str]:
        """Check that all feature dependencies are satisfied."""
        for feature in self.features:
            deps = FEATURE_DEPENDENCIES.get(feature, [])
            for dep in deps:
                if dep not in self.features:
                    return False, f"Feature '{feature}' requires '{dep}'"
        return True, ""

    def intersection(self, other: FeatureSet) -> FeatureSet:
        """Return features supported by both sets."""
        return FeatureSet(features=self.features & other.features)

    def __bool__(self) -> bool:
        """True if any features are set."""
        return bool(self.features)

    def __contains__(self, feature: str) -> bool:
        return feature in self.features

    def __iter__(self):
        return iter(self.features)

    def __len__(self) -> int:
        return len(self.features)

Represents a set of protocol features advertised by a peer.

Used for feature negotiation during handshake and CoinJoin sessions.

Static methods

def from_comma_string(s: str) ‑> FeatureSet

Parse from plus-separated string (e.g., 'neutrino_compat+push_encrypted').

Note: Despite the method name, uses '+' as separator because the peerlist itself uses ',' to separate entries. The name is kept for backward compatibility. Also accepts ',' for legacy/handshake use cases.

def from_handshake(handshake_data: dict[str, Any]) ‑> FeatureSet

Extract features from a handshake payload.

def from_list(feature_list: list[str]) ‑> FeatureSet

Create from a list of feature names.

Instance variables

var features : set[str]

The type of the None singleton.

Methods

def intersection(self,
other: FeatureSet) ‑> FeatureSet
Expand source code
def intersection(self, other: FeatureSet) -> FeatureSet:
    """Return features supported by both sets."""
    return FeatureSet(features=self.features & other.features)

Return features supported by both sets.

def supports(self, feature: str) ‑> bool
Expand source code
def supports(self, feature: str) -> bool:
    """Check if this set includes a specific feature."""
    return feature in self.features

Check if this set includes a specific feature.

def supports_neutrino_compat(self) ‑> bool
Expand source code
def supports_neutrino_compat(self) -> bool:
    """Check if neutrino_compat is supported."""
    return FEATURE_NEUTRINO_COMPAT in self.features

Check if neutrino_compat is supported.

def supports_peerlist_features(self) ‑> bool
Expand source code
def supports_peerlist_features(self) -> bool:
    """Check if peer supports extended peerlist with features (F: suffix)."""
    return FEATURE_PEERLIST_FEATURES in self.features

Check if peer supports extended peerlist with features (F: suffix).

def supports_push_encrypted(self) ‑> bool
Expand source code
def supports_push_encrypted(self) -> bool:
    """Check if push_encrypted is supported."""
    return FEATURE_PUSH_ENCRYPTED in self.features

Check if push_encrypted is supported.

def to_comma_string(self) ‑> str
Expand source code
def to_comma_string(self) -> str:
    """Convert to plus-separated string for peerlist F: suffix.

    Note: Uses '+' as separator instead of ',' because the peerlist
    itself uses ',' to separate entries. Using ',' for features would
    cause parsing ambiguity.
    """
    return "+".join(sorted(self.features))

Convert to plus-separated string for peerlist F: suffix.

Note: Uses '+' as separator instead of ',' because the peerlist itself uses ',' to separate entries. Using ',' for features would cause parsing ambiguity.

def to_dict(self) ‑> dict[str, bool]
Expand source code
def to_dict(self) -> dict[str, bool]:
    """Convert to dict for JSON serialization."""
    return dict.fromkeys(sorted(self.features), True)

Convert to dict for JSON serialization.

def validate_dependencies(self) ‑> tuple[bool, str]
Expand source code
def validate_dependencies(self) -> tuple[bool, str]:
    """Check that all feature dependencies are satisfied."""
    for feature in self.features:
        deps = FEATURE_DEPENDENCIES.get(feature, [])
        for dep in deps:
            if dep not in self.features:
                return False, f"Feature '{feature}' requires '{dep}'"
    return True, ""

Check that all feature dependencies are satisfied.

class MessageType (*values)
Expand source code
class MessageType(IntEnum):
    PRIVMSG = 685
    PUBMSG = 687
    PEERLIST = 789
    GETPEERLIST = 791
    HANDSHAKE = 793
    DN_HANDSHAKE = 795
    PING = 797
    PONG = 799
    DISCONNECT = 801

    CONNECT = 785
    CONNECT_IN = 797

Enum where members are also (and must be) ints

Ancestors

  • enum.IntEnum
  • builtins.int
  • enum.ReprEnum
  • enum.Enum

Class variables

var CONNECT

The type of the None singleton.

var CONNECT_IN

The type of the None singleton.

var DISCONNECT

The type of the None singleton.

var DN_HANDSHAKE

The type of the None singleton.

var GETPEERLIST

The type of the None singleton.

var HANDSHAKE

The type of the None singleton.

var PEERLIST

The type of the None singleton.

var PING

The type of the None singleton.

var PONG

The type of the None singleton.

var PRIVMSG

The type of the None singleton.

var PUBMSG

The type of the None singleton.

class ProtocolMessage (**data: Any)
Expand source code
class ProtocolMessage(BaseModel):
    type: MessageType
    payload: dict[str, Any]

    def to_json(self) -> str:
        return json.dumps({"type": self.type.value, "data": self.payload})

    @classmethod
    def from_json(cls, data: str) -> ProtocolMessage:
        obj = json.loads(data)
        return cls(type=MessageType(obj["type"]), payload=obj["data"])

    def to_bytes(self) -> bytes:
        return self.to_json().encode("utf-8")

    @classmethod
    def from_bytes(cls, data: bytes) -> ProtocolMessage:
        return cls.from_json(data.decode("utf-8"))

Usage Documentation

Models

A base class for creating Pydantic models.

Attributes

__class_vars__
The names of the class variables defined on the model.
__private_attributes__
Metadata about the private attributes of the model.
__signature__
The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__
Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__
The core schema of the model.
__pydantic_custom_init__
Whether the model has a custom __init__ function.
__pydantic_decorators__
Metadata containing the decorators defined on the model. This replaces Model.__validators__ and Model.__root_validators__ from Pydantic V1.
__pydantic_generic_metadata__
Metadata for generic models; contains data used for a similar purpose to args, origin, parameters in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__
Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__
The name of the post-init method for the model, if defined.
__pydantic_root_model__
Whether the model is a [RootModel][pydantic.root_model.RootModel].
__pydantic_serializer__
The pydantic-core SchemaSerializer used to dump instances of the model.
__pydantic_validator__
The pydantic-core SchemaValidator used to validate instances of the model.
__pydantic_fields__
A dictionary of field names and their corresponding [FieldInfo][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__
A dictionary of computed field names and their corresponding [ComputedFieldInfo][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__
A dictionary containing extra values, if [extra][pydantic.config.ConfigDict.extra] is set to 'allow'.
__pydantic_fields_set__
The names of fields explicitly set during instantiation.
__pydantic_private__
Values of private attributes set on the model instance.

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Ancestors

  • pydantic.main.BaseModel

Class variables

var model_config

The type of the None singleton.

var payload : dict[str, typing.Any]

The type of the None singleton.

var typeMessageType

The type of the None singleton.

Static methods

def from_bytes(data: bytes) ‑> ProtocolMessage
def from_json(data: str) ‑> ProtocolMessage

Methods

def to_bytes(self) ‑> bytes
Expand source code
def to_bytes(self) -> bytes:
    return self.to_json().encode("utf-8")
def to_json(self) ‑> str
Expand source code
def to_json(self) -> str:
    return json.dumps({"type": self.type.value, "data": self.payload})
class RequiredFeatures (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class RequiredFeatures:
    """
    Features that this peer requires from counterparties.

    Used to filter incompatible peers during maker selection.
    """

    required: set[str] = Field(default_factory=set)

    @classmethod
    def for_neutrino_taker(cls) -> RequiredFeatures:
        """Create requirements for a taker using Neutrino backend."""
        return cls(required={FEATURE_NEUTRINO_COMPAT})

    @classmethod
    def none(cls) -> RequiredFeatures:
        """No required features."""
        return cls(required=set())

    def is_compatible(self, peer_features: FeatureSet) -> tuple[bool, str]:
        """Check if peer supports all required features."""
        missing = self.required - peer_features.features
        if missing:
            return False, f"Missing required features: {missing}"
        return True, ""

    def __bool__(self) -> bool:
        return bool(self.required)

Features that this peer requires from counterparties.

Used to filter incompatible peers during maker selection.

Static methods

def for_neutrino_taker() ‑> RequiredFeatures

Create requirements for a taker using Neutrino backend.

def none() ‑> RequiredFeatures

No required features.

Instance variables

var required : set[str]

The type of the None singleton.

Methods

def is_compatible(self,
peer_features: FeatureSet) ‑> tuple[bool, str]
Expand source code
def is_compatible(self, peer_features: FeatureSet) -> tuple[bool, str]:
    """Check if peer supports all required features."""
    missing = self.required - peer_features.features
    if missing:
        return False, f"Missing required features: {missing}"
    return True, ""

Check if peer supports all required features.

class UTXOMetadata (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class UTXOMetadata:
    """
    Extended UTXO metadata for Neutrino-compatible verification.

    This allows light clients to verify UTXOs without arbitrary blockchain queries
    by providing the scriptPubKey (for Neutrino watch list) and block height
    (for efficient rescan starting point).
    """

    txid: str
    vout: int
    scriptpubkey: str | None = None  # Hex-encoded scriptPubKey
    blockheight: int | None = None  # Block height where UTXO was confirmed

    def to_legacy_str(self) -> str:
        """Format as legacy string: txid:vout"""
        return f"{self.txid}:{self.vout}"

    def to_extended_str(self) -> str:
        """Format as extended string: txid:vout:scriptpubkey:blockheight"""
        if self.scriptpubkey is None or self.blockheight is None:
            return self.to_legacy_str()
        return f"{self.txid}:{self.vout}:{self.scriptpubkey}:{self.blockheight}"

    @classmethod
    def from_str(cls, s: str) -> UTXOMetadata:
        """
        Parse UTXO string in either legacy or extended format.

        Legacy format: txid:vout
        Extended format: txid:vout:scriptpubkey:blockheight
        """
        parts = s.split(":")
        if len(parts) == 2:
            # Legacy format
            return cls(txid=parts[0], vout=int(parts[1]))
        elif len(parts) == 4:
            # Extended format
            return cls(
                txid=parts[0],
                vout=int(parts[1]),
                scriptpubkey=parts[2],
                blockheight=int(parts[3]),
            )
        else:
            raise ValueError(f"Invalid UTXO format: {s}")

    def has_neutrino_metadata(self) -> bool:
        """Check if this UTXO has the metadata needed for Neutrino verification."""
        return self.scriptpubkey is not None and self.blockheight is not None

    @staticmethod
    def is_valid_scriptpubkey(scriptpubkey: str) -> bool:
        """Validate scriptPubKey format (hex string)."""
        if not scriptpubkey:
            return False
        # Must be valid hex
        if not re.match(r"^[0-9a-fA-F]+$", scriptpubkey):
            return False
        # Common scriptPubKey lengths (in hex chars):
        # P2PKH: 50 (25 bytes), P2SH: 46 (23 bytes)
        # P2WPKH: 44 (22 bytes), P2WSH: 68 (34 bytes)
        # P2TR: 68 (34 bytes)
        return not (len(scriptpubkey) < 4 or len(scriptpubkey) > 200)

Extended UTXO metadata for Neutrino-compatible verification.

This allows light clients to verify UTXOs without arbitrary blockchain queries by providing the scriptPubKey (for Neutrino watch list) and block height (for efficient rescan starting point).

Static methods

def from_str(s: str) ‑> UTXOMetadata

Parse UTXO string in either legacy or extended format.

Legacy format: txid:vout Extended format: txid:vout:scriptpubkey:blockheight

def is_valid_scriptpubkey(scriptpubkey: str) ‑> bool
Expand source code
@staticmethod
def is_valid_scriptpubkey(scriptpubkey: str) -> bool:
    """Validate scriptPubKey format (hex string)."""
    if not scriptpubkey:
        return False
    # Must be valid hex
    if not re.match(r"^[0-9a-fA-F]+$", scriptpubkey):
        return False
    # Common scriptPubKey lengths (in hex chars):
    # P2PKH: 50 (25 bytes), P2SH: 46 (23 bytes)
    # P2WPKH: 44 (22 bytes), P2WSH: 68 (34 bytes)
    # P2TR: 68 (34 bytes)
    return not (len(scriptpubkey) < 4 or len(scriptpubkey) > 200)

Validate scriptPubKey format (hex string).

Instance variables

var blockheight : int | None

The type of the None singleton.

var scriptpubkey : str | None

The type of the None singleton.

var txid : str

The type of the None singleton.

var vout : int

The type of the None singleton.

Methods

def has_neutrino_metadata(self) ‑> bool
Expand source code
def has_neutrino_metadata(self) -> bool:
    """Check if this UTXO has the metadata needed for Neutrino verification."""
    return self.scriptpubkey is not None and self.blockheight is not None

Check if this UTXO has the metadata needed for Neutrino verification.

def to_extended_str(self) ‑> str
Expand source code
def to_extended_str(self) -> str:
    """Format as extended string: txid:vout:scriptpubkey:blockheight"""
    if self.scriptpubkey is None or self.blockheight is None:
        return self.to_legacy_str()
    return f"{self.txid}:{self.vout}:{self.scriptpubkey}:{self.blockheight}"

Format as extended string: txid:vout:scriptpubkey:blockheight

def to_legacy_str(self) ‑> str
Expand source code
def to_legacy_str(self) -> str:
    """Format as legacy string: txid:vout"""
    return f"{self.txid}:{self.vout}"

Format as legacy string: txid:vout