Module jmwallet.history

Transaction history tracking for CoinJoin operations.

Stores a simple CSV log of all CoinJoin transactions with key metadata: - Role (maker/taker) - Fees (paid/received) - Peer count (only known by takers; None for makers) - Transaction details

Functions

def append_history_entry(entry: TransactionHistoryEntry,
data_dir: Path | None = None) ‑> None
Expand source code
def append_history_entry(
    entry: TransactionHistoryEntry,
    data_dir: Path | None = None,
) -> None:
    """
    Append a transaction history entry to the CSV file.

    Args:
        entry: The transaction history entry to append
        data_dir: Optional data directory (defaults to get_default_data_dir())
    """
    history_path = _get_history_path(data_dir)
    fieldnames = _get_fieldnames()

    # Check if file exists to determine if we need to write header
    write_header = not history_path.exists()

    try:
        with open(history_path, "a", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            if write_header:
                writer.writeheader()

            # Convert entry to dict
            row = {f.name: getattr(entry, f.name) for f in fields(entry)}
            writer.writerow(row)

        logger.debug(f"Appended history entry: txid={entry.txid[:16]}... role={entry.role}")
    except Exception as e:
        logger.error(f"Failed to write history entry: {e}")

Append a transaction history entry to the CSV file.

Args

entry
The transaction history entry to append
data_dir
Optional data directory (defaults to get_default_data_dir())
def create_maker_history_entry(taker_nick: str,
cj_amount: int,
fee_received: int,
txfee_contribution: int,
cj_address: str,
change_address: str,
our_utxos: list[tuple[str, int]],
txid: str | None = None,
network: str = 'mainnet') ‑> TransactionHistoryEntry
Expand source code
def create_maker_history_entry(
    taker_nick: str,
    cj_amount: int,
    fee_received: int,
    txfee_contribution: int,
    cj_address: str,
    change_address: str,
    our_utxos: list[tuple[str, int]],
    txid: str | None = None,
    network: str = "mainnet",
) -> TransactionHistoryEntry:
    """
    Create a history entry for a maker CoinJoin (initially marked as pending).

    The transaction is created with success=False and confirmations=0 to indicate
    it's pending confirmation. A background task should later update this entry
    once the transaction is confirmed on-chain.

    Args:
        taker_nick: The taker's nick
        cj_amount: CoinJoin amount in sats
        fee_received: CoinJoin fee received
        txfee_contribution: Mining fee contribution
        cj_address: Our CoinJoin output address
        change_address: Our change output address
        our_utxos: List of (txid, vout) tuples for our inputs
        txid: Transaction ID (may not be known by maker)
        network: Network name

    Returns:
        TransactionHistoryEntry ready to be appended (marked as pending)
    """
    now = datetime.now().isoformat()
    net_fee = fee_received - txfee_contribution

    return TransactionHistoryEntry(
        timestamp=now,
        completed_at="",  # Not completed until confirmed
        role="maker",
        success=False,  # Pending confirmation
        failure_reason="Pending confirmation",
        confirmations=0,
        confirmed_at="",
        txid=txid or "",
        cj_amount=cj_amount,
        peer_count=None,  # Makers don't know total peer count
        counterparty_nicks=taker_nick,
        fee_received=fee_received,
        txfee_contribution=txfee_contribution,
        net_fee=net_fee,
        source_mixdepth=0,  # Would need to determine from UTXOs
        destination_address=cj_address,
        change_address=change_address,
        utxos_used=",".join(f"{txid}:{vout}" for txid, vout in our_utxos),
        network=network,
    )

Create a history entry for a maker CoinJoin (initially marked as pending).

The transaction is created with success=False and confirmations=0 to indicate it's pending confirmation. A background task should later update this entry once the transaction is confirmed on-chain.

Args

taker_nick
The taker's nick
cj_amount
CoinJoin amount in sats
fee_received
CoinJoin fee received
txfee_contribution
Mining fee contribution
cj_address
Our CoinJoin output address
change_address
Our change output address
our_utxos
List of (txid, vout) tuples for our inputs
txid
Transaction ID (may not be known by maker)
network
Network name

Returns

TransactionHistoryEntry ready to be appended (marked as pending)

def create_taker_history_entry(maker_nicks: list[str],
cj_amount: int,
total_maker_fees: int,
mining_fee: int,
destination: str,
source_mixdepth: int,
selected_utxos: list[tuple[str, int]],
txid: str,
broadcast_method: str = 'self',
network: str = 'mainnet',
success: bool = False,
failure_reason: str = 'Pending confirmation') ‑> TransactionHistoryEntry
Expand source code
def create_taker_history_entry(
    maker_nicks: list[str],
    cj_amount: int,
    total_maker_fees: int,
    mining_fee: int,
    destination: str,
    source_mixdepth: int,
    selected_utxos: list[tuple[str, int]],
    txid: str,
    broadcast_method: str = "self",
    network: str = "mainnet",
    success: bool = False,  # Default to pending
    failure_reason: str = "Pending confirmation",
) -> TransactionHistoryEntry:
    """
    Create a history entry for a taker CoinJoin (initially marked as pending).

    The transaction is created with success=False and confirmations=0 by default
    to indicate it's pending confirmation. A background task should later update
    this entry once the transaction is confirmed on-chain.

    Args:
        maker_nicks: List of maker nicks
        cj_amount: CoinJoin amount in sats
        total_maker_fees: Total maker fees paid
        mining_fee: Mining fee paid
        destination: Destination address
        source_mixdepth: Source mixdepth
        selected_utxos: List of (txid, vout) tuples for our inputs
        txid: Transaction ID
        broadcast_method: How the tx was broadcast
        network: Network name
        success: Whether the CoinJoin succeeded (default False for pending)
        failure_reason: Reason for failure if any (default "Pending confirmation")

    Returns:
        TransactionHistoryEntry ready to be appended
    """
    now = datetime.now().isoformat()
    net_fee = -(total_maker_fees + mining_fee)  # Negative = cost

    return TransactionHistoryEntry(
        timestamp=now,
        completed_at="" if not success else now,
        role="taker",
        success=success,
        failure_reason=failure_reason,
        confirmations=0,
        confirmed_at="",
        txid=txid,
        cj_amount=cj_amount,
        peer_count=len(maker_nicks),
        counterparty_nicks=",".join(maker_nicks),
        total_maker_fees_paid=total_maker_fees,
        mining_fee_paid=mining_fee,
        net_fee=net_fee,
        source_mixdepth=source_mixdepth,
        destination_address=destination,
        utxos_used=",".join(f"{txid}:{vout}" for txid, vout in selected_utxos),
        broadcast_method=broadcast_method,
        network=network,
    )

Create a history entry for a taker CoinJoin (initially marked as pending).

The transaction is created with success=False and confirmations=0 by default to indicate it's pending confirmation. A background task should later update this entry once the transaction is confirmed on-chain.

Args

maker_nicks
List of maker nicks
cj_amount
CoinJoin amount in sats
total_maker_fees
Total maker fees paid
mining_fee
Mining fee paid
destination
Destination address
source_mixdepth
Source mixdepth
selected_utxos
List of (txid, vout) tuples for our inputs
txid
Transaction ID
broadcast_method
How the tx was broadcast
network
Network name
success
Whether the CoinJoin succeeded (default False for pending)
failure_reason
Reason for failure if any (default "Pending confirmation")

Returns

TransactionHistoryEntry ready to be appended

async def detect_coinjoin_peer_count(backend: BlockchainBackend | Any, txid: str, cj_amount: int) ‑> int | None
Expand source code
async def detect_coinjoin_peer_count(
    backend: BlockchainBackend | Any,
    txid: str,
    cj_amount: int,
) -> int | None:
    """
    Detect the number of CoinJoin participants by counting equal-amount outputs.

    When makers participate in a CoinJoin, they don't know the total number of
    participants. However, once the transaction confirms, we can analyze it to
    count outputs with the CoinJoin amount.

    Args:
        backend: Blockchain backend to fetch transaction data
        txid: Transaction ID to analyze
        cj_amount: The CoinJoin amount in satoshis

    Returns:
        Number of equal-amount outputs (peer count), or None if detection fails
    """
    try:
        from jmcore.bitcoin import parse_transaction

        # Fetch the transaction
        tx = await backend.get_transaction(txid)
        if not tx:
            logger.warning(f"Could not fetch transaction {txid} for peer count detection")
            return None

        # Parse the raw transaction to get outputs
        parsed_tx = parse_transaction(tx.raw)

        # Count outputs with the CoinJoin amount
        equal_amount_count = sum(1 for output in parsed_tx.outputs if output["value"] == cj_amount)

        if equal_amount_count == 0:
            logger.warning(
                f"No outputs matching CoinJoin amount {cj_amount} sats in tx {txid[:16]}..."
            )
            return None

        logger.debug(
            f"Detected {equal_amount_count} equal-amount outputs "
            f"({cj_amount:,} sats each) in tx {txid[:16]}..."
        )
        return equal_amount_count

    except Exception as e:
        logger.warning(f"Failed to detect peer count for tx {txid[:16]}...: {e}")
        return None

Detect the number of CoinJoin participants by counting equal-amount outputs.

When makers participate in a CoinJoin, they don't know the total number of participants. However, once the transaction confirms, we can analyze it to count outputs with the CoinJoin amount.

Args

backend
Blockchain backend to fetch transaction data
txid
Transaction ID to analyze
cj_amount
The CoinJoin amount in satoshis

Returns

Number of equal-amount outputs (peer count), or None if detection fails

def get_history_stats(data_dir: Path | None = None) ‑> dict[str, int | float]
Expand source code
def get_history_stats(data_dir: Path | None = None) -> dict[str, int | float]:
    """
    Get aggregate statistics from transaction history.

    Returns:
        Dict with statistics:
        - total_coinjoins: Total number of CoinJoins
        - maker_coinjoins: Number as maker
        - taker_coinjoins: Number as taker
        - total_volume: Total CJ amount in sats
        - total_fees_earned: Total fees earned as maker
        - total_fees_paid: Total fees paid as taker
        - success_rate: Percentage of successful CoinJoins
    """
    entries = read_history(data_dir)

    if not entries:
        return {
            "total_coinjoins": 0,
            "maker_coinjoins": 0,
            "taker_coinjoins": 0,
            "total_volume": 0,
            "total_fees_earned": 0,
            "total_fees_paid": 0,
            "success_rate": 0.0,
        }

    maker_entries = [e for e in entries if e.role == "maker"]
    taker_entries = [e for e in entries if e.role == "taker"]
    successful = [e for e in entries if e.success]

    return {
        "total_coinjoins": len(entries),
        "maker_coinjoins": len(maker_entries),
        "taker_coinjoins": len(taker_entries),
        "total_volume": sum(e.cj_amount for e in entries),
        "total_fees_earned": sum(e.fee_received for e in maker_entries),
        "total_fees_paid": sum(e.total_maker_fees_paid + e.mining_fee_paid for e in taker_entries),
        "success_rate": len(successful) / len(entries) * 100 if entries else 0.0,
    }

Get aggregate statistics from transaction history.

Returns

Dict with statistics: - total_coinjoins: Total number of CoinJoins - maker_coinjoins: Number as maker - taker_coinjoins: Number as taker - total_volume: Total CJ amount in sats - total_fees_earned: Total fees earned as maker - total_fees_paid: Total fees paid as taker - success_rate: Percentage of successful CoinJoins

def get_pending_transactions(data_dir: Path | None = None) ‑> list[TransactionHistoryEntry]
Expand source code
def get_pending_transactions(data_dir: Path | None = None) -> list[TransactionHistoryEntry]:
    """
    Get all pending (unconfirmed) transactions from history.

    Returns entries that are:
    - Not yet confirmed (success=False, confirmations=0)
    - Either have a txid waiting for confirmation, or no txid yet (needs discovery)

    Returns:
        List of pending entries (includes entries without txid)
    """
    entries = read_history(data_dir)
    return [e for e in entries if not e.success and e.confirmations == 0]

Get all pending (unconfirmed) transactions from history.

Returns entries that are: - Not yet confirmed (success=False, confirmations=0) - Either have a txid waiting for confirmation, or no txid yet (needs discovery)

Returns

List of pending entries (includes entries without txid)

def get_used_addresses(data_dir: Path | None = None) ‑> set[str]
Expand source code
def get_used_addresses(data_dir: Path | None = None) -> set[str]:
    """
    Get all addresses that have been used in CoinJoin history.

    Returns both destination addresses (CoinJoin outputs) and change addresses
    from all history entries, regardless of success or confirmation status.

    This is critical for privacy: once an address has been shared with peers
    (even if the transaction failed or wasn't confirmed), it should never be
    reused.

    Args:
        data_dir: Optional data directory

    Returns:
        Set of addresses that should not be reused
    """
    entries = read_history(data_dir)
    used_addresses = set()

    for entry in entries:
        if entry.destination_address:
            used_addresses.add(entry.destination_address)
        if entry.change_address:
            used_addresses.add(entry.change_address)

    return used_addresses

Get all addresses that have been used in CoinJoin history.

Returns both destination addresses (CoinJoin outputs) and change addresses from all history entries, regardless of success or confirmation status.

This is critical for privacy: once an address has been shared with peers (even if the transaction failed or wasn't confirmed), it should never be reused.

Args

data_dir
Optional data directory

Returns

Set of addresses that should not be reused

def read_history(data_dir: Path | None = None,
limit: int | None = None,
role_filter: "Literal['maker', 'taker'] | None" = None) ‑> list[TransactionHistoryEntry]
Expand source code
def read_history(
    data_dir: Path | None = None,
    limit: int | None = None,
    role_filter: Literal["maker", "taker"] | None = None,
) -> list[TransactionHistoryEntry]:
    """
    Read transaction history from the CSV file.

    Args:
        data_dir: Optional data directory (defaults to get_default_data_dir())
        limit: Maximum number of entries to return (most recent first)
        role_filter: Filter by role (maker/taker)

    Returns:
        List of TransactionHistoryEntry objects
    """
    history_path = _get_history_path(data_dir)

    if not history_path.exists():
        return []

    entries: list[TransactionHistoryEntry] = []

    try:
        with open(history_path, newline="", encoding="utf-8") as f:
            reader = csv.DictReader(f)
            for row in reader:
                # Convert string values back to appropriate types
                try:
                    entry = TransactionHistoryEntry(
                        timestamp=row.get("timestamp", ""),
                        completed_at=row.get("completed_at", ""),
                        role=row.get("role", "taker"),  # type: ignore
                        success=row.get("success", "True").lower() == "true",
                        failure_reason=row.get("failure_reason", ""),
                        confirmations=int(row.get("confirmations", 0) or 0),
                        confirmed_at=row.get("confirmed_at", ""),
                        txid=row.get("txid", ""),
                        cj_amount=int(row.get("cj_amount", 0) or 0),
                        peer_count=(
                            int(row["peer_count"])
                            if row.get("peer_count") and row["peer_count"] not in ("", "None")
                            else None
                        ),
                        counterparty_nicks=row.get("counterparty_nicks", ""),
                        fee_received=int(row.get("fee_received", 0) or 0),
                        txfee_contribution=int(row.get("txfee_contribution", 0) or 0),
                        total_maker_fees_paid=int(row.get("total_maker_fees_paid", 0) or 0),
                        mining_fee_paid=int(row.get("mining_fee_paid", 0) or 0),
                        net_fee=int(row.get("net_fee", 0) or 0),
                        source_mixdepth=int(row.get("source_mixdepth", 0) or 0),
                        destination_address=row.get("destination_address", ""),
                        change_address=row.get("change_address", ""),
                        utxos_used=row.get("utxos_used", ""),
                        broadcast_method=row.get("broadcast_method", ""),
                        network=row.get("network", "mainnet"),
                    )

                    # Apply role filter
                    if role_filter and entry.role != role_filter:
                        continue

                    entries.append(entry)
                except (ValueError, KeyError) as e:
                    logger.warning(f"Skipping malformed history row: {e}")
                    continue

    except Exception as e:
        logger.error(f"Failed to read history: {e}")
        return []

    # Sort by timestamp (most recent first) and apply limit
    entries.sort(key=lambda e: e.timestamp, reverse=True)
    if limit:
        entries = entries[:limit]

    return entries

Read transaction history from the CSV file.

Args

data_dir
Optional data directory (defaults to get_default_data_dir())
limit
Maximum number of entries to return (most recent first)
role_filter
Filter by role (maker/taker)

Returns

List of TransactionHistoryEntry objects

def update_pending_transaction_txid(destination_address: str, txid: str, data_dir: Path | None = None) ‑> bool
Expand source code
def update_pending_transaction_txid(
    destination_address: str,
    txid: str,
    data_dir: Path | None = None,
) -> bool:
    """
    Update a pending transaction's txid by matching the destination address.

    This is used when a maker doesn't initially know the txid (didn't receive !push),
    but can discover it later by finding which transaction paid to the CoinJoin address.

    Args:
        destination_address: The CoinJoin destination address to match
        txid: The discovered transaction ID
        data_dir: Optional data directory

    Returns:
        True if a matching entry was found and updated, False otherwise
    """
    history_path = _get_history_path(data_dir)
    if not history_path.exists():
        return False

    entries = read_history(data_dir)
    updated = False

    for entry in entries:
        # Match by destination address and empty txid (pending without txid)
        if entry.destination_address == destination_address and not entry.txid:
            entry.txid = txid
            logger.info(
                f"Updated pending transaction for {destination_address[:20]}... "
                f"with txid {txid[:16]}..."
            )
            updated = True
            break

    if not updated:
        return False

    # Rewrite the entire history file
    try:
        fieldnames = _get_fieldnames()
        with open(history_path, "w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            for entry in entries:
                row = {f.name: getattr(entry, f.name) for f in fields(entry)}
                writer.writerow(row)
        return True
    except Exception as e:
        logger.error(f"Failed to update history: {e}")
        return False

Update a pending transaction's txid by matching the destination address.

This is used when a maker doesn't initially know the txid (didn't receive !push), but can discover it later by finding which transaction paid to the CoinJoin address.

Args

destination_address
The CoinJoin destination address to match
txid
The discovered transaction ID
data_dir
Optional data directory

Returns

True if a matching entry was found and updated, False otherwise

def update_transaction_confirmation(txid: str, confirmations: int, data_dir: Path | None = None) ‑> bool
Expand source code
def update_transaction_confirmation(
    txid: str,
    confirmations: int,
    data_dir: Path | None = None,
) -> bool:
    """
    Update a transaction's confirmation status in the history file.

    This function rewrites the entire CSV file with the updated entry.
    If confirmations > 0, marks the transaction as successful.

    Note: This is the synchronous version. For makers who want automatic
    peer count detection, use update_transaction_confirmation_with_detection().

    Args:
        txid: Transaction ID to update
        confirmations: Current number of confirmations
        data_dir: Optional data directory

    Returns:
        True if transaction was found and updated, False otherwise
    """
    history_path = _get_history_path(data_dir)
    if not history_path.exists():
        return False

    entries = read_history(data_dir)
    updated = False

    for entry in entries:
        if entry.txid == txid:
            entry.confirmations = confirmations
            if confirmations > 0 and not entry.success:
                # Mark as successful on first confirmation
                entry.success = True
                entry.failure_reason = ""
                entry.confirmed_at = datetime.now().isoformat()
                entry.completed_at = entry.confirmed_at
                logger.info(
                    f"Transaction {txid[:16]}... confirmed with {confirmations} confirmations"
                )
            elif confirmations > 0:
                # Already marked as successful, just update confirmation count
                logger.debug(f"Updated confirmations for {txid[:16]}...: {confirmations}")
            updated = True
            break

    if not updated:
        return False

    # Rewrite the entire history file
    try:
        fieldnames = _get_fieldnames()
        with open(history_path, "w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            for entry in entries:
                row = {f.name: getattr(entry, f.name) for f in fields(entry)}
                writer.writerow(row)
        return True
    except Exception as e:
        logger.error(f"Failed to update history: {e}")
        return False

Update a transaction's confirmation status in the history file.

This function rewrites the entire CSV file with the updated entry. If confirmations > 0, marks the transaction as successful.

Note: This is the synchronous version. For makers who want automatic peer count detection, use update_transaction_confirmation_with_detection().

Args

txid
Transaction ID to update
confirmations
Current number of confirmations
data_dir
Optional data directory

Returns

True if transaction was found and updated, False otherwise

async def update_transaction_confirmation_with_detection(txid: str,
confirmations: int,
backend: BlockchainBackend | Any | None = None,
data_dir: Path | None = None) ‑> bool
Expand source code
async def update_transaction_confirmation_with_detection(
    txid: str,
    confirmations: int,
    backend: BlockchainBackend | Any | None = None,
    data_dir: Path | None = None,
) -> bool:
    """
    Update transaction confirmation and detect peer count for makers.

    This async version can detect the CoinJoin peer count by analyzing the
    transaction outputs when it confirms. This is useful for makers who don't
    know the peer count during the CoinJoin.

    Args:
        txid: Transaction ID to update
        confirmations: Current number of confirmations
        backend: Blockchain backend for fetching transaction (optional, for peer detection)
        data_dir: Optional data directory

    Returns:
        True if transaction was found and updated, False otherwise
    """
    history_path = _get_history_path(data_dir)
    if not history_path.exists():
        return False

    entries = read_history(data_dir)
    updated = False

    for entry in entries:
        if entry.txid == txid:
            entry.confirmations = confirmations
            if confirmations > 0 and not entry.success:
                # Mark as successful on first confirmation
                entry.success = True
                entry.failure_reason = ""
                entry.confirmed_at = datetime.now().isoformat()
                entry.completed_at = entry.confirmed_at
                logger.info(
                    f"Transaction {txid[:16]}... confirmed with {confirmations} confirmations"
                )

                # Detect peer count for makers
                if (
                    entry.role == "maker"
                    and entry.peer_count is None
                    and backend is not None
                    and entry.cj_amount > 0
                ):
                    detected_count = await detect_coinjoin_peer_count(
                        backend, txid, entry.cj_amount
                    )
                    if detected_count is not None:
                        entry.peer_count = detected_count
                        logger.info(
                            f"Detected {detected_count} participants in CoinJoin {txid[:16]}..."
                        )

            elif confirmations > 0:
                # Already marked as successful, just update confirmation count
                logger.debug(f"Updated confirmations for {txid[:16]}...: {confirmations}")
            updated = True
            break

    if not updated:
        return False

    # Rewrite the entire history file
    try:
        fieldnames = _get_fieldnames()
        with open(history_path, "w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            for entry in entries:
                row = {f.name: getattr(entry, f.name) for f in fields(entry)}
                writer.writerow(row)
        return True
    except Exception as e:
        logger.error(f"Failed to update history: {e}")
        return False

Update transaction confirmation and detect peer count for makers.

This async version can detect the CoinJoin peer count by analyzing the transaction outputs when it confirms. This is useful for makers who don't know the peer count during the CoinJoin.

Args

txid
Transaction ID to update
confirmations
Current number of confirmations
backend
Blockchain backend for fetching transaction (optional, for peer detection)
data_dir
Optional data directory

Returns

True if transaction was found and updated, False otherwise

def update_transaction_peer_count(txid: str, peer_count: int, data_dir: Path | None = None) ‑> bool
Expand source code
def update_transaction_peer_count(
    txid: str,
    peer_count: int,
    data_dir: Path | None = None,
) -> bool:
    """
    Update a transaction's peer count in the history file.

    This is used for makers to update the peer count after detecting it
    from the confirmed transaction's equal-amount outputs.

    Args:
        txid: Transaction ID to update
        peer_count: Detected peer count
        data_dir: Optional data directory

    Returns:
        True if transaction was found and updated, False otherwise
    """
    history_path = _get_history_path(data_dir)
    if not history_path.exists():
        return False

    entries = read_history(data_dir)
    updated = False

    for entry in entries:
        if entry.txid == txid and entry.peer_count is None:
            entry.peer_count = peer_count
            logger.info(f"Updated peer count for tx {txid[:16]}... to {peer_count}")
            updated = True
            break

    if not updated:
        return False

    # Rewrite the entire history file
    try:
        fieldnames = _get_fieldnames()
        with open(history_path, "w", newline="", encoding="utf-8") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            for entry in entries:
                row = {f.name: getattr(entry, f.name) for f in fields(entry)}
                writer.writerow(row)
        return True
    except Exception as e:
        logger.error(f"Failed to update history: {e}")
        return False

Update a transaction's peer count in the history file.

This is used for makers to update the peer count after detecting it from the confirmed transaction's equal-amount outputs.

Args

txid
Transaction ID to update
peer_count
Detected peer count
data_dir
Optional data directory

Returns

True if transaction was found and updated, False otherwise

Classes

class TransactionHistoryEntry (*args: Any, **kwargs: Any)
Expand source code
@dataclass
class TransactionHistoryEntry:
    """A single CoinJoin transaction record."""

    # Timestamps
    timestamp: str  # ISO format
    completed_at: str = ""  # ISO format

    # Role and outcome
    role: Literal["maker", "taker"] = "taker"
    success: bool = True
    failure_reason: str = ""

    # Confirmation tracking
    confirmations: int = 0  # Number of confirmations (0 = unconfirmed/pending)
    confirmed_at: str = ""  # ISO format - when first confirmation was seen

    # Core transaction data
    txid: str = ""
    cj_amount: int = 0  # satoshis

    # Peer information
    peer_count: int | None = None  # None for makers (unknown), count for takers
    counterparty_nicks: str = ""  # comma-separated

    # Fee information (in satoshis)
    fee_received: int = 0  # Only for makers - cjfee earned
    txfee_contribution: int = 0  # Mining fee contribution
    total_maker_fees_paid: int = 0  # Only for takers
    mining_fee_paid: int = 0  # Only for takers

    # Net profit/cost
    net_fee: int = 0  # Positive = profit, negative = cost

    # UTXO/address info
    source_mixdepth: int = 0
    destination_address: str = ""
    change_address: str = ""  # Change output address (must also be blacklisted!)
    utxos_used: str = ""  # comma-separated txid:vout

    # Broadcast method
    broadcast_method: str = ""  # "self", "maker:<nick>", etc.

    # Network
    network: str = "mainnet"

A single CoinJoin transaction record.

Instance variables

var broadcast_method : str

The type of the None singleton.

var change_address : str

The type of the None singleton.

var cj_amount : int

The type of the None singleton.

var completed_at : str

The type of the None singleton.

var confirmations : int

The type of the None singleton.

var confirmed_at : str

The type of the None singleton.

var counterparty_nicks : str

The type of the None singleton.

var destination_address : str

The type of the None singleton.

var failure_reason : str

The type of the None singleton.

var fee_received : int

The type of the None singleton.

var mining_fee_paid : int

The type of the None singleton.

var net_fee : int

The type of the None singleton.

var network : str

The type of the None singleton.

var peer_count : int | None

The type of the None singleton.

var role : Literal['maker', 'taker']

The type of the None singleton.

var source_mixdepth : int

The type of the None singleton.

var success : bool

The type of the None singleton.

var timestamp : str

The type of the None singleton.

var total_maker_fees_paid : int

The type of the None singleton.

var txfee_contribution : int

The type of the None singleton.

var txid : str

The type of the None singleton.

var utxos_used : str

The type of the None singleton.