Module orderbook_watcher.server
HTTP server for serving static files and orderbook data.
Classes
class OrderbookServer (settings: Settings, aggregator: OrderbookAggregator)-
Expand source code
class OrderbookServer: def __init__(self, settings: Settings, aggregator: OrderbookAggregator) -> None: self.settings = settings self.aggregator = aggregator self.app = web.Application() self.runner: web.AppRunner | None = None self.site: web.TCPSite | None = None self._update_task: asyncio.Task[Any] | None = None self._cached_orderbook: str | None = None self._cache_lock = asyncio.Lock() self._background_update_task: asyncio.Task[Any] | None = None self._stopping = False self._setup_routes() def _setup_routes(self) -> None: self.app.router.add_get("/", self._handle_index) self.app.router.add_get("/orderbook.json", self._handle_orderbook_json) self.app.router.add_get("/health", self._handle_health) static_dir = Path(__file__).parent.parent.parent / "static" if static_dir.exists(): self.app.router.add_static("/static/", path=static_dir, name="static") async def _handle_index(self, _request: web.Request) -> web.Response | web.FileResponse: static_dir = Path(__file__).parent.parent.parent / "static" index_file = static_dir / "index.html" if index_file.exists(): return web.FileResponse(index_file) return web.Response(text="Orderbook Watcher", status=200) async def _handle_orderbook_json(self, _request: web.Request) -> web.Response: async with self._cache_lock: if self._cached_orderbook: return web.Response(text=self._cached_orderbook, content_type="application/json") orderbook = await self.aggregator.get_live_orderbook() if orderbook is None: return web.json_response({"error": "Orderbook not available"}, status=503) data = self._format_orderbook(orderbook) json_str = json.dumps(data) async with self._cache_lock: self._cached_orderbook = json_str return web.Response(text=json_str, content_type="application/json") def _format_orderbook(self, orderbook: OrderBook) -> dict[str, Any]: offers_by_directory = orderbook.get_offers_by_directory() directory_stats = {} for node, offers in offers_by_directory.items(): bond_offers = [o for o in offers if o.fidelity_bond_data] directory_stats[node] = { "offer_count": len(offers), "bond_offer_count": len(bond_offers), } for node_id in self.aggregator.directory_nodes: node_str = f"{node_id[0]}:{node_id[1]}" if node_str not in directory_stats: directory_stats[node_str] = {"offer_count": 0, "bond_offer_count": 0} for status_node_id, status in self.aggregator.node_statuses.items(): if status_node_id in directory_stats: directory_stats[status_node_id].update(status.to_dict(orderbook.timestamp)) grouped_offers: dict[tuple[str, int], dict[str, Any]] = {} for offer in orderbook.offers: key = (offer.counterparty, offer.oid) if key not in grouped_offers: grouped_offers[key] = { "counterparty": offer.counterparty, "oid": offer.oid, "ordertype": offer.ordertype.value, "minsize": offer.minsize, "maxsize": offer.maxsize, "txfee": offer.txfee, "cjfee": offer.cjfee, "fidelity_bond_value": offer.fidelity_bond_value, "directory_nodes": [offer.directory_node] if offer.directory_node else [], "fidelity_bond_data": offer.fidelity_bond_data, "features": offer.features, } elif ( offer.directory_node and offer.directory_node not in grouped_offers[key]["directory_nodes"] ): grouped_offers[key]["directory_nodes"].append(offer.directory_node) # Merge features from multiple directories for feature, value in offer.features.items(): if value: grouped_offers[key]["features"][feature] = value # Calculate feature statistics feature_stats: dict[str, int] = {} unique_makers = set() for offer_data in grouped_offers.values(): counterparty = offer_data["counterparty"] if counterparty not in unique_makers: unique_makers.add(counterparty) features = offer_data.get("features", {}) for feature, value in features.items(): if value: feature_stats[feature] = feature_stats.get(feature, 0) + 1 # Track makers without any features (legacy/reference implementation) if not features: feature_stats["legacy"] = feature_stats.get("legacy", 0) + 1 return { "timestamp": orderbook.timestamp.isoformat(), "offers": list(grouped_offers.values()), "fidelitybonds": [ { "counterparty": bond.counterparty, "utxo": {"txid": bond.utxo_txid, "vout": bond.utxo_vout}, "bond_value": bond.bond_value, "locktime": bond.locktime, "amount": bond.amount, "script": bond.script, "utxo_confirmations": bond.utxo_confirmations, "utxo_confirmation_timestamp": bond.utxo_confirmation_timestamp, "cert_expiry": bond.cert_expiry, "directory_node": bond.directory_node, } for bond in orderbook.fidelity_bonds ], "directory_nodes": orderbook.directory_nodes, "directory_stats": directory_stats, "feature_stats": feature_stats, "mempool_url": self.settings.mempool_web_url or self.settings.mempool_api_url.replace("/api", ""), "mempool_onion_url": self.settings.mempool_web_onion_url, } async def _handle_health(self, _request: web.Request) -> web.Response: orderbook = await self.aggregator.get_orderbook() return web.json_response( { "status": "healthy", "offers": len(orderbook.offers), "fidelity_bonds": len(orderbook.fidelity_bonds), "directory_nodes": len(orderbook.directory_nodes), "last_update": orderbook.timestamp.isoformat(), } ) async def _update_cache_loop(self) -> None: await asyncio.sleep(2) last_hash = 0 while True: try: orderbook = await self.aggregator.get_live_orderbook() current_hash = hash( ( tuple((o.counterparty, o.oid, o.directory_node) for o in orderbook.offers), tuple((b.utxo_txid, b.utxo_vout) for b in orderbook.fidelity_bonds), ) ) if current_hash != last_hash: data = self._format_orderbook(orderbook) json_str = json.dumps(data) async with self._cache_lock: self._cached_orderbook = json_str logger.debug(f"Cache updated: {len(orderbook.offers)} offers") last_hash = current_hash except Exception as e: logger.error(f"Error updating cache: {e}") await asyncio.sleep(30) async def start(self) -> None: logger.info( f"Starting orderbook server on {self.settings.http_host}:{self.settings.http_port}" ) self.runner = web.AppRunner(self.app) await self.runner.setup() self.site = web.TCPSite(self.runner, self.settings.http_host, self.settings.http_port) await self.site.start() logger.info("Starting continuous directory listeners...") await self.aggregator.start_continuous_listening() self._background_update_task = asyncio.create_task(self._update_cache_loop()) logger.info( f"Orderbook server running at http://{self.settings.http_host}:{self.settings.http_port}" ) async def stop(self) -> None: if self._stopping: return self._stopping = True logger.info("Stopping orderbook server...") if self._background_update_task: self._background_update_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._background_update_task self._background_update_task = None logger.info("Stopping directory listeners...") await self.aggregator.stop_listening() if self.site: with contextlib.suppress(RuntimeError): await self.site.stop() self.site = None if self.runner: with contextlib.suppress(RuntimeError): await self.runner.cleanup() self.runner = None logger.info("Orderbook server stopped")Methods
async def start(self) ‑> None-
Expand source code
async def start(self) -> None: logger.info( f"Starting orderbook server on {self.settings.http_host}:{self.settings.http_port}" ) self.runner = web.AppRunner(self.app) await self.runner.setup() self.site = web.TCPSite(self.runner, self.settings.http_host, self.settings.http_port) await self.site.start() logger.info("Starting continuous directory listeners...") await self.aggregator.start_continuous_listening() self._background_update_task = asyncio.create_task(self._update_cache_loop()) logger.info( f"Orderbook server running at http://{self.settings.http_host}:{self.settings.http_port}" ) async def stop(self) ‑> None-
Expand source code
async def stop(self) -> None: if self._stopping: return self._stopping = True logger.info("Stopping orderbook server...") if self._background_update_task: self._background_update_task.cancel() with contextlib.suppress(asyncio.CancelledError): await self._background_update_task self._background_update_task = None logger.info("Stopping directory listeners...") await self.aggregator.stop_listening() if self.site: with contextlib.suppress(RuntimeError): await self.site.stop() self.site = None if self.runner: with contextlib.suppress(RuntimeError): await self.runner.cleanup() self.runner = None logger.info("Orderbook server stopped")