Module orderbook_watcher.main

Main entry point for the orderbook watcher.

Functions

def main() ‑> None
Expand source code
def main() -> None:
    parser = argparse.ArgumentParser(description="JoinMarket Orderbook Watcher")
    parser.add_argument(
        "--log-level",
        "-l",
        default=None,
        help="Log level (default: from config or INFO)",
    )
    args = parser.parse_args()

    try:
        asyncio.run(run_watcher(log_level=args.log_level))
    except KeyboardInterrupt:
        logger.info("Interrupted by user")
    except Exception as e:
        logger.exception(f"Fatal error: {e}")
        sys.exit(1)
async def run_watcher(log_level: str | None = None) ‑> None
Expand source code
async def run_watcher(log_level: str | None = None) -> None:
    settings = get_settings()
    # Use CLI log level if provided, otherwise fall back to settings
    effective_log_level = log_level if log_level else settings.logging.level
    setup_logging(effective_log_level)

    network = settings.network_config.network
    watcher_settings = settings.orderbook_watcher
    data_dir = settings.get_data_dir()

    # Generate a nick for the orderbook watcher
    nick_identity = NickIdentity(JM_VERSION)
    watcher_nick = nick_identity.nick

    logger.info("=" * 80)
    logger.info("Starting JoinMarket Orderbook Watcher")
    logger.info(f"Network: {network.value}")
    logger.info(f"Nick: {watcher_nick}")
    logger.info(f"HTTP server: {watcher_settings.http_host}:{watcher_settings.http_port}")
    logger.info(f"Update interval: {watcher_settings.update_interval}s")
    logger.info(f"Mempool API: {watcher_settings.mempool_api_url}")

    # Directory nodes from env var (DIRECTORY_NODES) or config
    directory_nodes_str = os.environ.get("DIRECTORY_NODES", "")
    if not directory_nodes_str:
        # Fall back to directory servers from network config
        if settings.network_config.directory_servers:
            directory_nodes_str = ",".join(settings.network_config.directory_servers)
        else:
            # Use default directory servers
            directory_nodes_str = ",".join(settings.get_directory_servers())

    directory_nodes = get_directory_nodes(directory_nodes_str)
    if not directory_nodes:
        logger.error("No directory nodes configured. Set DIRECTORY_NODES environment variable.")
        logger.error("Example: DIRECTORY_NODES=node1.onion:5222,node2.onion:5222")
        sys.exit(1)

    logger.info(f"Directory nodes: {len(directory_nodes)}")
    for node in directory_nodes:
        logger.info(f"  - {node[0]}:{node[1]}")
    logger.info("=" * 80)

    # Write nick state file for external tracking
    write_nick_state(data_dir, "orderbook", watcher_nick)
    logger.info(f"Nick state written to {data_dir}/state/orderbook.nick")

    aggregator = OrderbookAggregator(
        directory_nodes=directory_nodes,
        network=network.value,
        socks_host=settings.tor.socks_host,
        socks_port=settings.tor.socks_port,
        timeout=watcher_settings.connection_timeout,
        mempool_api_url=watcher_settings.mempool_api_url,
        max_message_size=watcher_settings.max_message_size,
        uptime_grace_period=watcher_settings.uptime_grace_period,
    )

    server = OrderbookServer(watcher_settings, aggregator)

    loop = asyncio.get_running_loop()
    shutdown_event = asyncio.Event()

    def shutdown_handler() -> None:
        logger.info("Received shutdown signal")
        shutdown_event.set()

    for sig in (signal.SIGTERM, signal.SIGINT):
        loop.add_signal_handler(sig, shutdown_handler)

    try:
        # Send startup notification immediately (including nick)
        notifier = get_notifier(settings, component_name="Orderbook")
        await notifier.notify_startup(
            component="Orderbook Watcher",
            network=network.value,
            nick=watcher_nick,
        )
        await server.start()
        await shutdown_event.wait()
    except asyncio.CancelledError:
        logger.info("Watcher cancelled")
    except Exception as e:
        logger.error(f"Watcher error: {e}")
        raise
    finally:
        # Clean up nick state file on shutdown
        remove_nick_state(data_dir, "orderbook")
        await server.stop()
def setup_logging(level: str) ‑> None
Expand source code
def setup_logging(level: str) -> None:
    logger.remove()

    logger.add(
        sys.stderr,
        format="<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>",
        level=level,
        colorize=True,
    )