Module orderbook_watcher.health_checker

Maker health checking via direct onion connection.

This module provides functionality to verify maker availability by connecting directly to their onion addresses when possible, performing handshakes to extract features, and tracking reachability status.

Classes

class MakerHealthChecker (network: str,
socks_host: str = '127.0.0.1',
socks_port: int = 9050,
timeout: float = 15.0,
check_interval: float = 600.0,
max_concurrent_checks: int = 10)
Expand source code
class MakerHealthChecker:
    """
    Checks maker reachability via direct onion connections.

    This class performs periodic health checks on makers by:
    1. Connecting directly to their onion addresses
    2. Performing handshake to verify they're online
    3. Extracting feature flags from handshake response
    4. Tracking reachability history

    Health checks are rate-limited to avoid stressing makers.
    """

    def __init__(
        self,
        network: str,
        socks_host: str = "127.0.0.1",
        socks_port: int = 9050,
        timeout: float = 15.0,
        check_interval: float = 600.0,  # 10 minutes
        max_concurrent_checks: int = 10,
    ) -> None:
        """
        Initialize MakerHealthChecker.

        Args:
            network: Bitcoin network (mainnet, testnet, signet, regtest)
            socks_host: SOCKS proxy host for Tor
            socks_port: SOCKS proxy port for Tor
            timeout: Connection timeout in seconds
            check_interval: Minimum seconds between checks for same maker
            max_concurrent_checks: Maximum concurrent health checks
        """
        self.network = network
        self.socks_host = socks_host
        self.socks_port = socks_port
        self.timeout = timeout
        self.check_interval = check_interval
        self.max_concurrent_checks = max_concurrent_checks

        # Health status tracking: location -> status
        self.health_status: dict[str, MakerHealthStatus] = {}

        # Semaphore to limit concurrent checks
        self._check_semaphore = asyncio.Semaphore(max_concurrent_checks)

        # Nick identity for handshake (ephemeral)
        self.nick_identity = NickIdentity(JM_VERSION)

    async def check_maker(self, nick: str, location: str, force: bool = False) -> MakerHealthStatus:
        """
        Check if a maker is reachable via direct connection.

        Args:
            nick: Maker's nick
            location: Maker's onion address (format: onion:port)
            force: Force check even if recently checked

        Returns:
            MakerHealthStatus with reachability info and features
        """
        # Check if we should skip this check (rate limiting)
        current_time = time.time()
        if not force and location in self.health_status:
            last_check = self.health_status[location].last_check_time
            if current_time - last_check < self.check_interval:
                logger.debug(
                    f"Skipping health check for {nick} at {location} "
                    f"(checked {current_time - last_check:.0f}s ago)"
                )
                return self.health_status[location]

        async with self._check_semaphore:
            return await self._check_maker_impl(nick, location, current_time)

    async def _check_maker_impl(
        self, nick: str, location: str, current_time: float
    ) -> MakerHealthStatus:
        """Internal implementation of maker health check."""
        # Parse location
        if location == "NOT-SERVING-ONION":
            # Cannot check makers that don't serve onion
            status = MakerHealthStatus(
                location=location,
                nick=nick,
                reachable=False,
                last_check_time=current_time,
                last_success_time=None,
                consecutive_failures=0,
                features=FeatureSet(),
                error="NOT-SERVING-ONION",
            )
            self.health_status[location] = status
            return status

        try:
            host, port_str = location.split(":")
            port = int(port_str)
        except (ValueError, AttributeError) as e:
            logger.warning(f"Invalid location format: {location}: {e}")
            status = MakerHealthStatus(
                location=location,
                nick=nick,
                reachable=False,
                last_check_time=current_time,
                last_success_time=None,
                consecutive_failures=self.health_status.get(
                    location,
                    MakerHealthStatus(
                        location=location,
                        nick=nick,
                        reachable=False,
                        last_check_time=0,
                        last_success_time=None,
                        consecutive_failures=0,
                        features=FeatureSet(),
                    ),
                ).consecutive_failures
                + 1,
                features=FeatureSet(),
                error=f"Invalid location: {e}",
            )
            self.health_status[location] = status
            return status

        # Try to connect and perform handshake
        logger.debug(f"Health check: connecting to {nick} at {location}")
        connection = None
        try:
            # Connect via Tor
            connection = await connect_via_tor(
                host,
                port,
                self.socks_host,
                self.socks_port,
                max_message_size=2097152,
                timeout=self.timeout,
            )

            # Perform handshake
            # Request peerlist_features support to get maker's features from handshake

            our_features = FeatureSet(features={FEATURE_PEERLIST_FEATURES})
            handshake_data = create_handshake_request(
                nick=self.nick_identity.nick,
                location="NOT-SERVING-ONION",
                network=self.network,
                directory=False,
                features=our_features,
            )

            handshake_msg = {
                "type": 793,  # MessageType.HANDSHAKE
                "line": json.dumps(handshake_data),
            }
            await connection.send(json.dumps(handshake_msg).encode("utf-8"))

            # Wait for response
            response_data = await asyncio.wait_for(connection.receive(), timeout=self.timeout)
            response = json.loads(response_data.decode("utf-8"))

            if response.get("type") not in (793, 795):  # HANDSHAKE or DN_HANDSHAKE
                raise Exception(f"Unexpected response type: {response.get('type')}")

            handshake_response = json.loads(response["line"])
            if not handshake_response.get("accepted", False):
                raise Exception("Handshake rejected")

            # Extract features from handshake
            features = FeatureSet.from_handshake(handshake_response)

            # Maker is reachable!
            status = MakerHealthStatus(
                location=location,
                nick=nick,
                reachable=True,
                last_check_time=current_time,
                last_success_time=current_time,
                consecutive_failures=0,
                features=features,
                error=None,
            )
            self.health_status[location] = status
            logger.info(
                f"Health check: {nick} at {location} is REACHABLE "
                f"(features: {features.to_comma_string() or 'none'})"
            )
            return status

        except TimeoutError:
            error = "Connection timeout"
            logger.debug(f"Health check: {nick} at {location} timed out")
        except Exception as e:
            error = str(e)
            logger.debug(f"Health check: {nick} at {location} failed: {e}")

        finally:
            if connection:
                with contextlib.suppress(Exception):
                    await connection.close()

        # Maker is unreachable
        old_status = self.health_status.get(location)
        consecutive_failures = (old_status.consecutive_failures + 1) if old_status else 1
        last_success = old_status.last_success_time if old_status else None

        status = MakerHealthStatus(
            location=location,
            nick=nick,
            reachable=False,
            last_check_time=current_time,
            last_success_time=last_success,
            consecutive_failures=consecutive_failures,
            features=old_status.features if old_status else FeatureSet(),
            error=error,
        )
        self.health_status[location] = status

        if consecutive_failures >= 3:
            logger.warning(
                f"Health check: {nick} at {location} is UNREACHABLE "
                f"({consecutive_failures} consecutive failures, error: {error})"
            )

        return status

    async def check_makers_batch(
        self, makers: list[tuple[str, str]], force: bool = False
    ) -> dict[str, MakerHealthStatus]:
        """
        Check health of multiple makers in parallel.

        Args:
            makers: List of (nick, location) tuples
            force: Force check even if recently checked

        Returns:
            Dict mapping location to MakerHealthStatus
        """
        tasks = [self.check_maker(nick, location, force) for nick, location in makers]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        status_map: dict[str, MakerHealthStatus] = {}
        for (nick, location), result in zip(makers, results, strict=True):
            if isinstance(result, BaseException):
                # Handle both Exception and BaseException (e.g., asyncio.CancelledError)
                logger.error(f"Health check for {nick} at {location} raised exception: {result}")
                status_map[location] = MakerHealthStatus(
                    location=location,
                    nick=nick,
                    reachable=False,
                    last_check_time=time.time(),
                    last_success_time=None,
                    consecutive_failures=self.health_status.get(
                        location,
                        MakerHealthStatus(
                            location=location,
                            nick=nick,
                            reachable=False,
                            last_check_time=0,
                            last_success_time=None,
                            consecutive_failures=0,
                            features=FeatureSet(),
                        ),
                    ).consecutive_failures
                    + 1,
                    features=FeatureSet(),
                    error=str(result),
                )
            else:
                # Type narrowing: result is MakerHealthStatus here
                status_map[location] = result

        return status_map

    def get_unreachable_locations(self, max_consecutive_failures: int = 3) -> set[str]:
        """
        Get set of locations that are considered unreachable.

        Args:
            max_consecutive_failures: Number of failures before marking unreachable

        Returns:
            Set of location strings for unreachable makers
        """
        return {
            location
            for location, status in self.health_status.items()
            if not status.is_healthy(max_consecutive_failures)
        }

    def get_feature_map(self) -> dict[str, FeatureSet]:
        """
        Get map of locations to their feature sets.

        Only includes makers that have been successfully checked.

        Returns:
            Dict mapping location to FeatureSet
        """
        return {
            location: status.features
            for location, status in self.health_status.items()
            if status.last_success_time is not None
        }

    def clear_status(self, location: str) -> None:
        """Clear health status for a location (e.g., when maker reconnects)."""
        self.health_status.pop(location, None)

Checks maker reachability via direct onion connections.

This class performs periodic health checks on makers by: 1. Connecting directly to their onion addresses 2. Performing handshake to verify they're online 3. Extracting feature flags from handshake response 4. Tracking reachability history

Health checks are rate-limited to avoid stressing makers.

Initialize MakerHealthChecker.

Args

network
Bitcoin network (mainnet, testnet, signet, regtest)
socks_host
SOCKS proxy host for Tor
socks_port
SOCKS proxy port for Tor
timeout
Connection timeout in seconds
check_interval
Minimum seconds between checks for same maker
max_concurrent_checks
Maximum concurrent health checks

Methods

async def check_maker(self, nick: str, location: str, force: bool = False) ‑> MakerHealthStatus
Expand source code
async def check_maker(self, nick: str, location: str, force: bool = False) -> MakerHealthStatus:
    """
    Check if a maker is reachable via direct connection.

    Args:
        nick: Maker's nick
        location: Maker's onion address (format: onion:port)
        force: Force check even if recently checked

    Returns:
        MakerHealthStatus with reachability info and features
    """
    # Check if we should skip this check (rate limiting)
    current_time = time.time()
    if not force and location in self.health_status:
        last_check = self.health_status[location].last_check_time
        if current_time - last_check < self.check_interval:
            logger.debug(
                f"Skipping health check for {nick} at {location} "
                f"(checked {current_time - last_check:.0f}s ago)"
            )
            return self.health_status[location]

    async with self._check_semaphore:
        return await self._check_maker_impl(nick, location, current_time)

Check if a maker is reachable via direct connection.

Args

nick
Maker's nick
location
Maker's onion address (format: onion:port)
force
Force check even if recently checked

Returns

MakerHealthStatus with reachability info and features

async def check_makers_batch(self, makers: list[tuple[str, str]], force: bool = False) ‑> dict[str, MakerHealthStatus]
Expand source code
async def check_makers_batch(
    self, makers: list[tuple[str, str]], force: bool = False
) -> dict[str, MakerHealthStatus]:
    """
    Check health of multiple makers in parallel.

    Args:
        makers: List of (nick, location) tuples
        force: Force check even if recently checked

    Returns:
        Dict mapping location to MakerHealthStatus
    """
    tasks = [self.check_maker(nick, location, force) for nick, location in makers]
    results = await asyncio.gather(*tasks, return_exceptions=True)

    status_map: dict[str, MakerHealthStatus] = {}
    for (nick, location), result in zip(makers, results, strict=True):
        if isinstance(result, BaseException):
            # Handle both Exception and BaseException (e.g., asyncio.CancelledError)
            logger.error(f"Health check for {nick} at {location} raised exception: {result}")
            status_map[location] = MakerHealthStatus(
                location=location,
                nick=nick,
                reachable=False,
                last_check_time=time.time(),
                last_success_time=None,
                consecutive_failures=self.health_status.get(
                    location,
                    MakerHealthStatus(
                        location=location,
                        nick=nick,
                        reachable=False,
                        last_check_time=0,
                        last_success_time=None,
                        consecutive_failures=0,
                        features=FeatureSet(),
                    ),
                ).consecutive_failures
                + 1,
                features=FeatureSet(),
                error=str(result),
            )
        else:
            # Type narrowing: result is MakerHealthStatus here
            status_map[location] = result

    return status_map

Check health of multiple makers in parallel.

Args

makers
List of (nick, location) tuples
force
Force check even if recently checked

Returns

Dict mapping location to MakerHealthStatus

def clear_status(self, location: str) ‑> None
Expand source code
def clear_status(self, location: str) -> None:
    """Clear health status for a location (e.g., when maker reconnects)."""
    self.health_status.pop(location, None)

Clear health status for a location (e.g., when maker reconnects).

def get_feature_map(self) ‑> dict[str, FeatureSet]
Expand source code
def get_feature_map(self) -> dict[str, FeatureSet]:
    """
    Get map of locations to their feature sets.

    Only includes makers that have been successfully checked.

    Returns:
        Dict mapping location to FeatureSet
    """
    return {
        location: status.features
        for location, status in self.health_status.items()
        if status.last_success_time is not None
    }

Get map of locations to their feature sets.

Only includes makers that have been successfully checked.

Returns

Dict mapping location to FeatureSet

def get_unreachable_locations(self, max_consecutive_failures: int = 3) ‑> set[str]
Expand source code
def get_unreachable_locations(self, max_consecutive_failures: int = 3) -> set[str]:
    """
    Get set of locations that are considered unreachable.

    Args:
        max_consecutive_failures: Number of failures before marking unreachable

    Returns:
        Set of location strings for unreachable makers
    """
    return {
        location
        for location, status in self.health_status.items()
        if not status.is_healthy(max_consecutive_failures)
    }

Get set of locations that are considered unreachable.

Args

max_consecutive_failures
Number of failures before marking unreachable

Returns

Set of location strings for unreachable makers

class MakerHealthStatus (location: str,
nick: str,
reachable: bool,
last_check_time: float,
last_success_time: float | None,
consecutive_failures: int,
features: FeatureSet,
error: str | None = None)
Expand source code
@dataclass
class MakerHealthStatus:
    """Health status for a maker."""

    location: str  # onion:port
    nick: str
    reachable: bool
    last_check_time: float
    last_success_time: float | None
    consecutive_failures: int
    features: FeatureSet
    error: str | None = None

    def is_healthy(self, max_consecutive_failures: int = 3) -> bool:
        """Check if maker is considered healthy."""
        return self.reachable and self.consecutive_failures < max_consecutive_failures

Health status for a maker.

Instance variables

var consecutive_failures : int

The type of the None singleton.

var error : str | None

The type of the None singleton.

var featuresFeatureSet

The type of the None singleton.

var last_check_time : float

The type of the None singleton.

var last_success_time : float | None

The type of the None singleton.

var location : str

The type of the None singleton.

var nick : str

The type of the None singleton.

var reachable : bool

The type of the None singleton.

Methods

def is_healthy(self, max_consecutive_failures: int = 3) ‑> bool
Expand source code
def is_healthy(self, max_consecutive_failures: int = 3) -> bool:
    """Check if maker is considered healthy."""
    return self.reachable and self.consecutive_failures < max_consecutive_failures

Check if maker is considered healthy.