Module orderbook_watcher.aggregator

Orderbook aggregation logic across multiple directory nodes.

Classes

class DirectoryNodeStatus (node_id: str,
tracking_started: datetime | None = None,
grace_period_seconds: int = 0)
Expand source code
class DirectoryNodeStatus:
    def __init__(
        self,
        node_id: str,
        tracking_started: datetime | None = None,
        grace_period_seconds: int = 0,
    ) -> None:
        self.node_id = node_id
        self.connected = False
        self.last_connected: datetime | None = None
        self.last_disconnected: datetime | None = None
        self.connection_attempts = 0
        self.successful_connections = 0
        self.total_uptime_seconds = 0.0
        self.current_session_start: datetime | None = None
        self.tracking_started = tracking_started or datetime.now(UTC)
        self.grace_period_seconds = grace_period_seconds

    def mark_connected(self, current_time: datetime | None = None) -> None:
        now = current_time or datetime.now(UTC)
        self.connected = True
        self.last_connected = now
        self.current_session_start = now
        self.successful_connections += 1

    def mark_disconnected(self, current_time: datetime | None = None) -> None:
        now = current_time or datetime.now(UTC)
        if self.connected and self.current_session_start:
            # Only count uptime after grace period
            grace_end_ts = self.tracking_started.timestamp() + self.grace_period_seconds
            session_start_ts = self.current_session_start.timestamp()
            now_ts = now.timestamp()

            # Calculate the actual uptime to record (only after grace period)
            if now_ts > grace_end_ts:
                # Some or all of the session is after grace period
                counted_start = max(session_start_ts, grace_end_ts)
                session_duration = now_ts - counted_start
                self.total_uptime_seconds += max(0, session_duration)

        self.connected = False
        self.last_disconnected = now
        self.current_session_start = None

    def get_uptime_percentage(self, current_time: datetime | None = None) -> float:
        if not self.tracking_started:
            return 0.0
        now = current_time or datetime.now(UTC)
        elapsed = (now - self.tracking_started).total_seconds()

        # If we're still in grace period, return 100% uptime
        if elapsed < self.grace_period_seconds:
            return 100.0

        # Calculate total time excluding grace period
        total_time = elapsed - self.grace_period_seconds
        if total_time == 0:
            return 0.0

        # Calculate uptime, but only count time after grace period ends
        grace_end = self.tracking_started.timestamp() + self.grace_period_seconds
        current_uptime = self.total_uptime_seconds

        if self.connected and self.current_session_start:
            # Only count uptime after grace period ended
            session_start_ts = self.current_session_start.timestamp()
            if session_start_ts < grace_end:
                # Session started during grace period, only count time after grace ended
                uptime_duration = now.timestamp() - grace_end
            else:
                # Session started after grace period
                uptime_duration = (now - self.current_session_start).total_seconds()
            current_uptime += max(0, uptime_duration)

        return (current_uptime / total_time) * 100.0

    def to_dict(self, current_time: datetime | None = None) -> dict[str, Any]:
        return {
            "node_id": self.node_id,
            "connected": self.connected,
            "last_connected": self.last_connected.isoformat() if self.last_connected else None,
            "last_disconnected": self.last_disconnected.isoformat()
            if self.last_disconnected
            else None,
            "connection_attempts": self.connection_attempts,
            "successful_connections": self.successful_connections,
            "uptime_percentage": round(self.get_uptime_percentage(current_time), 2),
            "tracking_started": self.tracking_started.isoformat()
            if self.tracking_started
            else None,
        }

Methods

def get_uptime_percentage(self, current_time: datetime | None = None) ‑> float
Expand source code
def get_uptime_percentage(self, current_time: datetime | None = None) -> float:
    if not self.tracking_started:
        return 0.0
    now = current_time or datetime.now(UTC)
    elapsed = (now - self.tracking_started).total_seconds()

    # If we're still in grace period, return 100% uptime
    if elapsed < self.grace_period_seconds:
        return 100.0

    # Calculate total time excluding grace period
    total_time = elapsed - self.grace_period_seconds
    if total_time == 0:
        return 0.0

    # Calculate uptime, but only count time after grace period ends
    grace_end = self.tracking_started.timestamp() + self.grace_period_seconds
    current_uptime = self.total_uptime_seconds

    if self.connected and self.current_session_start:
        # Only count uptime after grace period ended
        session_start_ts = self.current_session_start.timestamp()
        if session_start_ts < grace_end:
            # Session started during grace period, only count time after grace ended
            uptime_duration = now.timestamp() - grace_end
        else:
            # Session started after grace period
            uptime_duration = (now - self.current_session_start).total_seconds()
        current_uptime += max(0, uptime_duration)

    return (current_uptime / total_time) * 100.0
def mark_connected(self, current_time: datetime | None = None) ‑> None
Expand source code
def mark_connected(self, current_time: datetime | None = None) -> None:
    now = current_time or datetime.now(UTC)
    self.connected = True
    self.last_connected = now
    self.current_session_start = now
    self.successful_connections += 1
def mark_disconnected(self, current_time: datetime | None = None) ‑> None
Expand source code
def mark_disconnected(self, current_time: datetime | None = None) -> None:
    now = current_time or datetime.now(UTC)
    if self.connected and self.current_session_start:
        # Only count uptime after grace period
        grace_end_ts = self.tracking_started.timestamp() + self.grace_period_seconds
        session_start_ts = self.current_session_start.timestamp()
        now_ts = now.timestamp()

        # Calculate the actual uptime to record (only after grace period)
        if now_ts > grace_end_ts:
            # Some or all of the session is after grace period
            counted_start = max(session_start_ts, grace_end_ts)
            session_duration = now_ts - counted_start
            self.total_uptime_seconds += max(0, session_duration)

    self.connected = False
    self.last_disconnected = now
    self.current_session_start = None
def to_dict(self, current_time: datetime | None = None) ‑> dict[str, typing.Any]
Expand source code
def to_dict(self, current_time: datetime | None = None) -> dict[str, Any]:
    return {
        "node_id": self.node_id,
        "connected": self.connected,
        "last_connected": self.last_connected.isoformat() if self.last_connected else None,
        "last_disconnected": self.last_disconnected.isoformat()
        if self.last_disconnected
        else None,
        "connection_attempts": self.connection_attempts,
        "successful_connections": self.successful_connections,
        "uptime_percentage": round(self.get_uptime_percentage(current_time), 2),
        "tracking_started": self.tracking_started.isoformat()
        if self.tracking_started
        else None,
    }
class OrderbookAggregator (directory_nodes: list[tuple[str, int]],
network: str,
socks_host: str = '127.0.0.1',
socks_port: int = 9050,
timeout: float = 30.0,
mempool_api_url: str = 'https://mempool.space/api',
max_retry_attempts: int = 3,
retry_delay: float = 5.0,
max_message_size: int = 2097152,
uptime_grace_period: int = 60)
Expand source code
class OrderbookAggregator:
    def __init__(
        self,
        directory_nodes: list[tuple[str, int]],
        network: str,
        socks_host: str = "127.0.0.1",
        socks_port: int = 9050,
        timeout: float = 30.0,
        mempool_api_url: str = "https://mempool.space/api",
        max_retry_attempts: int = 3,
        retry_delay: float = 5.0,
        max_message_size: int = 2097152,
        uptime_grace_period: int = 60,
    ) -> None:
        self.directory_nodes = directory_nodes
        self.network = network
        self.socks_host = socks_host
        self.socks_port = socks_port
        self.timeout = timeout
        self.mempool_api_url = mempool_api_url
        self.max_retry_attempts = max_retry_attempts
        self.retry_delay = retry_delay
        self.max_message_size = max_message_size
        self.uptime_grace_period = uptime_grace_period
        socks_proxy = f"socks5://{socks_host}:{socks_port}"
        logger.info(f"Configuring MempoolAPI with SOCKS proxy: {socks_proxy}")
        mempool_timeout = 60.0
        self.mempool_api = MempoolAPI(
            base_url=mempool_api_url, socks_proxy=socks_proxy, timeout=mempool_timeout
        )

        self._socks_test_task = asyncio.create_task(self._test_socks_connection())
        self.current_orderbook: OrderBook = OrderBook()
        self._lock = asyncio.Lock()
        self.clients: dict[str, DirectoryClient] = {}
        self.listener_tasks: list[asyncio.Task[Any]] = []
        self._bond_calculation_task: asyncio.Task[Any] | None = None
        self._bond_queue: asyncio.Queue[OrderBook] = asyncio.Queue()
        self._bond_cache: dict[str, FidelityBond] = {}
        self._last_offers_hash: int = 0
        self._mempool_semaphore = asyncio.Semaphore(5)
        self.node_statuses: dict[str, DirectoryNodeStatus] = {}
        self._retry_tasks: list[asyncio.Task[Any]] = []

        for onion_address, port in directory_nodes:
            node_id = f"{onion_address}:{port}"
            self.node_statuses[node_id] = DirectoryNodeStatus(
                node_id, grace_period_seconds=uptime_grace_period
            )

    def _handle_client_disconnect(self, onion_address: str, port: int) -> None:
        node_id = f"{onion_address}:{port}"
        client = self.clients.pop(node_id, None)
        if client:
            client.stop()
        self._schedule_reconnect(onion_address, port)

    def _schedule_reconnect(self, onion_address: str, port: int) -> None:
        node_id = f"{onion_address}:{port}"
        self._retry_tasks = [task for task in self._retry_tasks if not task.done()]
        if any(task.get_name() == f"retry:{node_id}" for task in self._retry_tasks):
            logger.debug(f"Retry already scheduled for {node_id}")
            return
        retry_task = asyncio.create_task(self._retry_failed_connection(onion_address, port))
        retry_task.set_name(f"retry:{node_id}")
        self._retry_tasks.append(retry_task)
        logger.info(f"Scheduled retry task for {node_id}")

    async def fetch_from_directory(
        self, onion_address: str, port: int
    ) -> tuple[list[Offer], list[FidelityBond], str]:
        node_id = f"{onion_address}:{port}"
        logger.info(f"Fetching orderbook from directory: {node_id}")
        client = DirectoryClient(
            onion_address,
            port,
            self.network,
            socks_host=self.socks_host,
            socks_port=self.socks_port,
            timeout=self.timeout,
            max_message_size=self.max_message_size,
        )
        try:
            await client.connect()
            offers, bonds = await client.fetch_orderbooks()

            for offer in offers:
                offer.directory_node = node_id
            for bond in bonds:
                bond.directory_node = node_id

            return offers, bonds, node_id
        except Exception as e:
            logger.error(f"Failed to fetch from directory {node_id}: {e}")
            return [], [], node_id
        finally:
            await client.close()

    async def update_orderbook(self) -> OrderBook:
        tasks = [
            self.fetch_from_directory(onion_address, port)
            for onion_address, port in self.directory_nodes
        ]

        results = await asyncio.gather(*tasks, return_exceptions=True)

        new_orderbook = OrderBook(timestamp=datetime.now(UTC))

        for result in results:
            if isinstance(result, BaseException):
                logger.error(f"Directory fetch failed: {result}")
                continue

            offers, bonds, node_id = result
            if offers or bonds:
                new_orderbook.add_offers(offers, node_id)
                new_orderbook.add_fidelity_bonds(bonds, node_id)

        await self._calculate_bond_values(new_orderbook)

        async with self._lock:
            self.current_orderbook = new_orderbook

        logger.info(
            f"Updated orderbook: {len(new_orderbook.offers)} offers, "
            f"{len(new_orderbook.fidelity_bonds)} bonds from "
            f"{len(new_orderbook.directory_nodes)} directory nodes"
        )

        return new_orderbook

    async def get_orderbook(self) -> OrderBook:
        async with self._lock:
            return self.current_orderbook

    async def _background_bond_calculator(self) -> None:
        while True:
            try:
                orderbook = await self._bond_queue.get()
                await self._calculate_bond_values(orderbook)
                for offer in orderbook.offers:
                    if offer.fidelity_bond_data:
                        matching_bonds = [
                            b
                            for b in orderbook.fidelity_bonds
                            if b.counterparty == offer.counterparty
                            and b.utxo_txid == offer.fidelity_bond_data.get("utxo_txid")
                        ]
                        if matching_bonds and matching_bonds[0].bond_value is not None:
                            offer.fidelity_bond_value = matching_bonds[0].bond_value
                logger.debug("Background bond calculation completed")
            except Exception as e:
                logger.error(f"Error in background bond calculator: {e}")

    async def _periodic_directory_connection_status(self) -> None:
        """Background task to periodically log directory connection status.

        This runs every 10 minutes to provide visibility into orderbook
        connectivity. Shows:
        - Total directory servers configured
        - Currently connected servers with uptime percentage
        - Disconnected servers (if any)
        """
        # First log after 5 minutes (give time for initial connection)
        await asyncio.sleep(300)

        while True:
            try:
                total_servers = len(self.directory_nodes)
                connected_nodes = []
                disconnected_nodes = []

                for node_id, status in self.node_statuses.items():
                    if status.connected:
                        uptime_pct = status.get_uptime_percentage()
                        connected_nodes.append(f"{node_id} ({uptime_pct:.1f}% uptime)")
                    else:
                        disconnected_nodes.append(node_id)

                connected_count = len(connected_nodes)

                if disconnected_nodes:
                    disconnected_str = ", ".join(disconnected_nodes[:5])
                    if len(disconnected_nodes) > 5:
                        disconnected_str += f", ... and {len(disconnected_nodes) - 5} more"
                    logger.warning(
                        f"Directory connection status: {connected_count}/{total_servers} connected. "
                        f"Disconnected: [{disconnected_str}]"
                    )
                else:
                    connected_str = ", ".join(connected_nodes[:5])
                    if len(connected_nodes) > 5:
                        connected_str += f", ... and {len(connected_nodes) - 5} more"
                    logger.info(
                        f"Directory connection status: {connected_count}/{total_servers} connected "
                        f"[{connected_str}]"
                    )

                # Log again in 10 minutes
                await asyncio.sleep(600)

            except asyncio.CancelledError:
                logger.info("Directory connection status task cancelled")
                break
            except Exception as e:
                logger.error(f"Error in directory connection status task: {e}")
                await asyncio.sleep(600)

        logger.info("Directory connection status task stopped")

    async def _connect_to_node(self, onion_address: str, port: int) -> DirectoryClient | None:
        node_id = f"{onion_address}:{port}"
        status = self.node_statuses[node_id]
        status.connection_attempts += 1

        logger.info(f"Connecting to directory: {node_id}")

        def on_disconnect() -> None:
            logger.info(f"Directory node {node_id} disconnected")
            status.mark_disconnected()
            self._handle_client_disconnect(onion_address, port)

        client = DirectoryClient(
            onion_address,
            port,
            self.network,
            socks_host=self.socks_host,
            socks_port=self.socks_port,
            timeout=self.timeout,
            max_message_size=self.max_message_size,
            on_disconnect=on_disconnect,
        )

        try:
            await client.connect()
            status.mark_connected()
            logger.info(f"Successfully connected to directory: {node_id}")
            return client

        except Exception as e:
            logger.warning(f"Connection to directory {node_id} failed: {e}")
            await client.close()
            status.mark_disconnected()
            self._schedule_reconnect(onion_address, port)
            return None

    async def _retry_failed_connection(self, onion_address: str, port: int) -> None:
        node_id = f"{onion_address}:{port}"

        while True:
            await asyncio.sleep(60)

            if node_id in self.clients:
                logger.debug(f"Node {node_id} already connected, stopping retry")
                return

            logger.info(f"Retrying connection to directory {node_id}...")
            client = await self._connect_to_node(onion_address, port)

            if client:
                self.clients[node_id] = client
                task = asyncio.create_task(client.listen_continuously())
                self.listener_tasks.append(task)
                logger.info(f"Successfully reconnected to directory: {node_id}")
                return

    async def start_continuous_listening(self) -> None:
        logger.info("Starting continuous listening on all directory nodes")

        self._bond_calculation_task = asyncio.create_task(self._background_bond_calculator())

        # Start periodic directory connection status logging task
        status_task = asyncio.create_task(self._periodic_directory_connection_status())
        self.listener_tasks.append(status_task)

        connection_tasks = [
            self._connect_to_node(onion_address, port)
            for onion_address, port in self.directory_nodes
        ]

        clients = await asyncio.gather(*connection_tasks, return_exceptions=True)

        for (onion_address, port), result in zip(self.directory_nodes, clients, strict=True):
            node_id = f"{onion_address}:{port}"

            if isinstance(result, BaseException):
                logger.error(f"Connection to {node_id} raised exception: {result}")
                retry_task = asyncio.create_task(self._retry_failed_connection(onion_address, port))
                self._retry_tasks.append(retry_task)
                logger.info(f"Scheduled retry task for {node_id}")
            elif result is not None:
                self.clients[node_id] = result
                task = asyncio.create_task(result.listen_continuously())
                self.listener_tasks.append(task)
                logger.info(f"Started listener task for {node_id}")
            else:
                retry_task = asyncio.create_task(self._retry_failed_connection(onion_address, port))
                self._retry_tasks.append(retry_task)
                logger.info(f"Scheduled retry task for {node_id}")

    async def stop_listening(self) -> None:
        logger.info("Stopping all directory listeners")

        if self._bond_calculation_task:
            self._bond_calculation_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._bond_calculation_task

        for task in self._retry_tasks:
            task.cancel()

        if self._retry_tasks:
            await asyncio.gather(*self._retry_tasks, return_exceptions=True)

        for client in self.clients.values():
            client.stop()

        for task in self.listener_tasks:
            task.cancel()

        if self.listener_tasks:
            await asyncio.gather(*self.listener_tasks, return_exceptions=True)

        for node_id, client in self.clients.items():
            self.node_statuses[node_id].mark_disconnected()
            await client.close()

        self.clients.clear()
        self.listener_tasks.clear()
        self._retry_tasks.clear()

    async def get_live_orderbook(self, calculate_bonds: bool = True) -> OrderBook:
        orderbook = OrderBook(timestamp=datetime.now(UTC))

        for node_id, client in self.clients.items():
            offers = client.get_current_offers()
            bonds = client.get_current_bonds()
            logger.debug(f"Node {node_id}: {len(offers)} offers, {len(bonds)} bonds")
            for offer in offers:
                offer.directory_node = node_id
            for bond in bonds:
                bond.directory_node = node_id
            orderbook.add_offers(offers, node_id)
            orderbook.add_fidelity_bonds(bonds, node_id)

        unique_bonds: dict[str, FidelityBond] = {}
        for bond in orderbook.fidelity_bonds:
            cache_key = f"{bond.utxo_txid}:{bond.utxo_vout}"
            if cache_key not in unique_bonds:
                unique_bonds[cache_key] = bond
        orderbook.fidelity_bonds = list(unique_bonds.values())

        if calculate_bonds:
            cached_count = 0
            for bond in orderbook.fidelity_bonds:
                cache_key = f"{bond.utxo_txid}:{bond.utxo_vout}"
                if cache_key in self._bond_cache:
                    cached_bond = self._bond_cache[cache_key]
                    bond.bond_value = cached_bond.bond_value
                    bond.amount = cached_bond.amount
                    bond.utxo_confirmation_timestamp = cached_bond.utxo_confirmation_timestamp
                    cached_count += 1

            if cached_count > 0:
                logger.debug(
                    f"Loaded {cached_count}/{len(orderbook.fidelity_bonds)} bonds from cache"
                )

            await self._calculate_bond_values(orderbook)

            for bond in orderbook.fidelity_bonds:
                if bond.bond_value is not None:
                    cache_key = f"{bond.utxo_txid}:{bond.utxo_vout}"
                    self._bond_cache[cache_key] = bond

            # Link fidelity bonds to offers
            # First pass: Link bonds that are already attached to offers (via fidelity_bond_data)
            for offer in orderbook.offers:
                if offer.fidelity_bond_data:
                    matching_bonds = [
                        b
                        for b in orderbook.fidelity_bonds
                        if b.counterparty == offer.counterparty
                        and b.utxo_txid == offer.fidelity_bond_data.get("utxo_txid")
                    ]
                    if matching_bonds and matching_bonds[0].bond_value is not None:
                        offer.fidelity_bond_value = matching_bonds[0].bond_value

            # Second pass: Link standalone bonds to offers that don't have bond data yet
            # This handles cases where the bond announcement arrived separately from the offer
            # (e.g., reference implementation makers responding to !orderbook requests)
            for offer in orderbook.offers:
                if not offer.fidelity_bond_data:
                    # Find any bonds from this counterparty
                    matching_bonds = [
                        b for b in orderbook.fidelity_bonds if b.counterparty == offer.counterparty
                    ]
                    if matching_bonds:
                        # Use the bond with highest value (or first if values not calculated)
                        bond = max(
                            matching_bonds,
                            key=lambda b: b.bond_value if b.bond_value is not None else 0,
                        )
                        # Attach bond data to offer
                        if bond.fidelity_bond_data:
                            offer.fidelity_bond_data = bond.fidelity_bond_data
                            if bond.bond_value is not None:
                                offer.fidelity_bond_value = bond.bond_value
                        logger.debug(
                            f"Linked standalone bond from {bond.counterparty} "
                            f"(txid={bond.utxo_txid[:16]}...) to offer oid={offer.oid}"
                        )

        return orderbook

    async def _calculate_bond_value_single(
        self, bond: FidelityBond, current_time: int
    ) -> FidelityBond:
        if bond.bond_value is not None:
            return bond

        async with self._mempool_semaphore:
            try:
                tx_data = await self.mempool_api.get_transaction(bond.utxo_txid)
                if not tx_data or not tx_data.status.confirmed:
                    logger.debug(f"Bond {bond.utxo_txid}:{bond.utxo_vout} not confirmed")
                    return bond

                if bond.utxo_vout >= len(tx_data.vout):
                    logger.warning(
                        f"Invalid vout {bond.utxo_vout} for tx {bond.utxo_txid} "
                        f"(only {len(tx_data.vout)} outputs)"
                    )
                    return bond

                utxo = tx_data.vout[bond.utxo_vout]
                amount = utxo.value
                confirmation_time = tx_data.status.block_time or current_time

                bond_value = calculate_timelocked_fidelity_bond_value(
                    amount, confirmation_time, bond.locktime, current_time
                )

                bond.bond_value = bond_value
                bond.amount = amount
                bond.utxo_confirmation_timestamp = confirmation_time

                logger.debug(
                    f"Bond {bond.counterparty}: value={bond_value}, "
                    f"amount={amount}, locktime={datetime.utcfromtimestamp(bond.locktime)}, "
                    f"confirmed={datetime.utcfromtimestamp(confirmation_time)}"
                )

            except Exception as e:
                logger.error(f"Failed to calculate bond value for {bond.utxo_txid}: {e}")
                logger.debug(
                    f"Bond data: txid={bond.utxo_txid}, vout={bond.utxo_vout}, amount={bond.amount}"
                )

        return bond

    async def _calculate_bond_values(self, orderbook: OrderBook) -> None:
        current_time = int(datetime.now(UTC).timestamp())

        tasks = [
            self._calculate_bond_value_single(bond, current_time)
            for bond in orderbook.fidelity_bonds
        ]
        await asyncio.gather(*tasks, return_exceptions=True)

    async def _test_socks_connection(self) -> None:
        """Test SOCKS proxy connection on startup."""
        try:
            success = await self.mempool_api.test_connection()
            if success:
                logger.info("SOCKS proxy connection test successful")
            else:
                logger.warning(
                    "SOCKS proxy connection test failed - bond value calculation may not work"
                )
        except Exception as e:
            logger.error(f"SOCKS proxy connection test error: {e}")
            logger.warning("Bond value calculation may not work without SOCKS proxy")

Methods

async def fetch_from_directory(self, onion_address: str, port: int) ‑> tuple[list[Offer], list[FidelityBond], str]
Expand source code
async def fetch_from_directory(
    self, onion_address: str, port: int
) -> tuple[list[Offer], list[FidelityBond], str]:
    node_id = f"{onion_address}:{port}"
    logger.info(f"Fetching orderbook from directory: {node_id}")
    client = DirectoryClient(
        onion_address,
        port,
        self.network,
        socks_host=self.socks_host,
        socks_port=self.socks_port,
        timeout=self.timeout,
        max_message_size=self.max_message_size,
    )
    try:
        await client.connect()
        offers, bonds = await client.fetch_orderbooks()

        for offer in offers:
            offer.directory_node = node_id
        for bond in bonds:
            bond.directory_node = node_id

        return offers, bonds, node_id
    except Exception as e:
        logger.error(f"Failed to fetch from directory {node_id}: {e}")
        return [], [], node_id
    finally:
        await client.close()
async def get_live_orderbook(self, calculate_bonds: bool = True) ‑> OrderBook
Expand source code
async def get_live_orderbook(self, calculate_bonds: bool = True) -> OrderBook:
    orderbook = OrderBook(timestamp=datetime.now(UTC))

    for node_id, client in self.clients.items():
        offers = client.get_current_offers()
        bonds = client.get_current_bonds()
        logger.debug(f"Node {node_id}: {len(offers)} offers, {len(bonds)} bonds")
        for offer in offers:
            offer.directory_node = node_id
        for bond in bonds:
            bond.directory_node = node_id
        orderbook.add_offers(offers, node_id)
        orderbook.add_fidelity_bonds(bonds, node_id)

    unique_bonds: dict[str, FidelityBond] = {}
    for bond in orderbook.fidelity_bonds:
        cache_key = f"{bond.utxo_txid}:{bond.utxo_vout}"
        if cache_key not in unique_bonds:
            unique_bonds[cache_key] = bond
    orderbook.fidelity_bonds = list(unique_bonds.values())

    if calculate_bonds:
        cached_count = 0
        for bond in orderbook.fidelity_bonds:
            cache_key = f"{bond.utxo_txid}:{bond.utxo_vout}"
            if cache_key in self._bond_cache:
                cached_bond = self._bond_cache[cache_key]
                bond.bond_value = cached_bond.bond_value
                bond.amount = cached_bond.amount
                bond.utxo_confirmation_timestamp = cached_bond.utxo_confirmation_timestamp
                cached_count += 1

        if cached_count > 0:
            logger.debug(
                f"Loaded {cached_count}/{len(orderbook.fidelity_bonds)} bonds from cache"
            )

        await self._calculate_bond_values(orderbook)

        for bond in orderbook.fidelity_bonds:
            if bond.bond_value is not None:
                cache_key = f"{bond.utxo_txid}:{bond.utxo_vout}"
                self._bond_cache[cache_key] = bond

        # Link fidelity bonds to offers
        # First pass: Link bonds that are already attached to offers (via fidelity_bond_data)
        for offer in orderbook.offers:
            if offer.fidelity_bond_data:
                matching_bonds = [
                    b
                    for b in orderbook.fidelity_bonds
                    if b.counterparty == offer.counterparty
                    and b.utxo_txid == offer.fidelity_bond_data.get("utxo_txid")
                ]
                if matching_bonds and matching_bonds[0].bond_value is not None:
                    offer.fidelity_bond_value = matching_bonds[0].bond_value

        # Second pass: Link standalone bonds to offers that don't have bond data yet
        # This handles cases where the bond announcement arrived separately from the offer
        # (e.g., reference implementation makers responding to !orderbook requests)
        for offer in orderbook.offers:
            if not offer.fidelity_bond_data:
                # Find any bonds from this counterparty
                matching_bonds = [
                    b for b in orderbook.fidelity_bonds if b.counterparty == offer.counterparty
                ]
                if matching_bonds:
                    # Use the bond with highest value (or first if values not calculated)
                    bond = max(
                        matching_bonds,
                        key=lambda b: b.bond_value if b.bond_value is not None else 0,
                    )
                    # Attach bond data to offer
                    if bond.fidelity_bond_data:
                        offer.fidelity_bond_data = bond.fidelity_bond_data
                        if bond.bond_value is not None:
                            offer.fidelity_bond_value = bond.bond_value
                    logger.debug(
                        f"Linked standalone bond from {bond.counterparty} "
                        f"(txid={bond.utxo_txid[:16]}...) to offer oid={offer.oid}"
                    )

    return orderbook
async def get_orderbook(self) ‑> OrderBook
Expand source code
async def get_orderbook(self) -> OrderBook:
    async with self._lock:
        return self.current_orderbook
async def start_continuous_listening(self) ‑> None
Expand source code
async def start_continuous_listening(self) -> None:
    logger.info("Starting continuous listening on all directory nodes")

    self._bond_calculation_task = asyncio.create_task(self._background_bond_calculator())

    # Start periodic directory connection status logging task
    status_task = asyncio.create_task(self._periodic_directory_connection_status())
    self.listener_tasks.append(status_task)

    connection_tasks = [
        self._connect_to_node(onion_address, port)
        for onion_address, port in self.directory_nodes
    ]

    clients = await asyncio.gather(*connection_tasks, return_exceptions=True)

    for (onion_address, port), result in zip(self.directory_nodes, clients, strict=True):
        node_id = f"{onion_address}:{port}"

        if isinstance(result, BaseException):
            logger.error(f"Connection to {node_id} raised exception: {result}")
            retry_task = asyncio.create_task(self._retry_failed_connection(onion_address, port))
            self._retry_tasks.append(retry_task)
            logger.info(f"Scheduled retry task for {node_id}")
        elif result is not None:
            self.clients[node_id] = result
            task = asyncio.create_task(result.listen_continuously())
            self.listener_tasks.append(task)
            logger.info(f"Started listener task for {node_id}")
        else:
            retry_task = asyncio.create_task(self._retry_failed_connection(onion_address, port))
            self._retry_tasks.append(retry_task)
            logger.info(f"Scheduled retry task for {node_id}")
async def stop_listening(self) ‑> None
Expand source code
async def stop_listening(self) -> None:
    logger.info("Stopping all directory listeners")

    if self._bond_calculation_task:
        self._bond_calculation_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._bond_calculation_task

    for task in self._retry_tasks:
        task.cancel()

    if self._retry_tasks:
        await asyncio.gather(*self._retry_tasks, return_exceptions=True)

    for client in self.clients.values():
        client.stop()

    for task in self.listener_tasks:
        task.cancel()

    if self.listener_tasks:
        await asyncio.gather(*self.listener_tasks, return_exceptions=True)

    for node_id, client in self.clients.items():
        self.node_statuses[node_id].mark_disconnected()
        await client.close()

    self.clients.clear()
    self.listener_tasks.clear()
    self._retry_tasks.clear()
async def update_orderbook(self) ‑> OrderBook
Expand source code
async def update_orderbook(self) -> OrderBook:
    tasks = [
        self.fetch_from_directory(onion_address, port)
        for onion_address, port in self.directory_nodes
    ]

    results = await asyncio.gather(*tasks, return_exceptions=True)

    new_orderbook = OrderBook(timestamp=datetime.now(UTC))

    for result in results:
        if isinstance(result, BaseException):
            logger.error(f"Directory fetch failed: {result}")
            continue

        offers, bonds, node_id = result
        if offers or bonds:
            new_orderbook.add_offers(offers, node_id)
            new_orderbook.add_fidelity_bonds(bonds, node_id)

    await self._calculate_bond_values(new_orderbook)

    async with self._lock:
        self.current_orderbook = new_orderbook

    logger.info(
        f"Updated orderbook: {len(new_orderbook.offers)} offers, "
        f"{len(new_orderbook.fidelity_bonds)} bonds from "
        f"{len(new_orderbook.directory_nodes)} directory nodes"
    )

    return new_orderbook