Module directory_server.peer_registry
Peer registry for tracking active peers and their metadata.
Implements Single Responsibility Principle: only manages peer state.
Classes
class PeerNotFoundError (*args, **kwargs)-
Expand source code
class PeerNotFoundError(Exception): passCommon base class for all non-exit exceptions.
Ancestors
- builtins.Exception
- builtins.BaseException
class PeerRegistry (max_peers: int = 1000)-
Expand source code
class PeerRegistry: def __init__(self, max_peers: int = 1000): self.max_peers = max_peers self._peers: dict[str, PeerInfo] = {} self._nick_to_key: dict[str, str] = {} def register(self, peer: PeerInfo) -> None: if len(self._peers) >= self.max_peers: raise ValueError(f"Maximum peers reached: {self.max_peers}") location = peer.location_string key = peer.nick if location == "NOT-SERVING-ONION" else location self._peers[key] = peer if peer.nick: self._nick_to_key[peer.nick] = key peer.last_seen = datetime.now(UTC) logger.info(f"Registered peer: {peer.nick} at {location}") def unregister(self, key: str) -> None: if key not in self._peers: return peer = self._peers[key] if peer.nick in self._nick_to_key: del self._nick_to_key[peer.nick] del self._peers[key] logger.info(f"Unregistered peer: {peer.nick} at {peer.location_string}") def get_by_key(self, key: str) -> PeerInfo | None: return self._peers.get(key) def get_by_location(self, location: str) -> PeerInfo | None: return self._peers.get(location) def get_by_nick(self, nick: str) -> PeerInfo | None: key = self._nick_to_key.get(nick) if key: return self._peers.get(key) return None def update_status(self, key: str, status: PeerStatus) -> None: peer = self.get_by_key(key) if peer: peer.status = status if status in (PeerStatus.CONNECTED, PeerStatus.HANDSHAKED): peer.last_seen = datetime.now(UTC) def _iter_connected(self, network: NetworkType | None = None) -> Iterator[PeerInfo]: """Iterator over connected peers. Creates a snapshot of peers to avoid RuntimeError if dict is modified during iteration. """ for p in list(self._peers.values()): if ( p.status == PeerStatus.HANDSHAKED and not p.is_directory and (network is None or p.network == network) ): yield p def iter_connected(self, network: NetworkType | None = None) -> Iterator[PeerInfo]: """Public memory-efficient iterator over connected peers.""" return self._iter_connected(network) def get_all_connected(self, network: NetworkType | None = None) -> list[PeerInfo]: return list(self._iter_connected(network)) def get_peerlist_for_network(self, network: NetworkType) -> list[tuple[str, str]]: # Use generator to avoid intermediate list # Include all connected peers, even NOT-SERVING-ONION # While they can't be directly connected to, they are reachable via the directory # for private messages, so this information is useful return [(peer.nick, peer.location_string) for peer in self._iter_connected(network)] def get_peerlist_with_features(self, network: NetworkType) -> list[tuple[str, str, FeatureSet]]: """ Get peerlist with features for peers on a network. Returns list of (nick, location, features) tuples for connected peers. Includes all peers, even NOT-SERVING-ONION, as they are still reachable via the directory for private messaging. """ result = [] for peer in self._iter_connected(network): # Build FeatureSet from peer.features dict features = FeatureSet(features={k for k, v in peer.features.items() if v is True}) result.append((peer.nick, peer.location_string, features)) return result def count(self) -> int: return len(self._peers) def clear(self) -> None: self._peers.clear() self._nick_to_key.clear() def get_passive_peers(self, network: NetworkType | None = None) -> list[PeerInfo]: """ Get passive peers (NOT-SERVING-ONION). These are typically orderbook watchers/takers that don't host their own onion service but connect to the directory to watch offers. """ return [p for p in self._iter_connected(network) if p.onion_address == "NOT-SERVING-ONION"] def get_active_peers(self, network: NetworkType | None = None) -> list[PeerInfo]: """ Get active peers (serving onion address). These are typically makers that host their own onion service and publish offers to the orderbook. """ return [p for p in self._iter_connected(network) if p.onion_address != "NOT-SERVING-ONION"] def get_stats(self) -> dict[str, int]: connected = 0 passive = 0 active = 0 neutrino_compat = 0 peerlist_features = 0 push_encrypted = 0 for p in list(self._peers.values()): if p.status == PeerStatus.HANDSHAKED and not p.is_directory: connected += 1 if p.onion_address == "NOT-SERVING-ONION": passive += 1 else: active += 1 # Count feature support from features dict features = p.features if features.get("neutrino_compat"): neutrino_compat += 1 if features.get("peerlist_features"): peerlist_features += 1 if features.get("push_encrypted"): push_encrypted += 1 return { "total_peers": len(self._peers), "connected_peers": connected, "passive_peers": passive, "active_peers": active, "neutrino_compat_peers": neutrino_compat, "peerlist_features_peers": peerlist_features, "push_encrypted_peers": push_encrypted, } def get_neutrino_compat_peers(self, network: NetworkType | None = None) -> list[PeerInfo]: """ Get peers that support neutrino_compat feature. These peers advertise extended UTXO metadata (scriptpubkey, blockheight) which is required for Neutrino backend verification. """ return [p for p in self._iter_connected(network) if p.neutrino_compat]Methods
def clear(self) ‑> None-
Expand source code
def clear(self) -> None: self._peers.clear() self._nick_to_key.clear() def count(self) ‑> int-
Expand source code
def count(self) -> int: return len(self._peers) def get_active_peers(self,
network: NetworkType | None = None) ‑> list[PeerInfo]-
Expand source code
def get_active_peers(self, network: NetworkType | None = None) -> list[PeerInfo]: """ Get active peers (serving onion address). These are typically makers that host their own onion service and publish offers to the orderbook. """ return [p for p in self._iter_connected(network) if p.onion_address != "NOT-SERVING-ONION"]Get active peers (serving onion address).
These are typically makers that host their own onion service and publish offers to the orderbook.
def get_all_connected(self,
network: NetworkType | None = None) ‑> list[PeerInfo]-
Expand source code
def get_all_connected(self, network: NetworkType | None = None) -> list[PeerInfo]: return list(self._iter_connected(network)) def get_by_key(self, key: str) ‑> PeerInfo | None-
Expand source code
def get_by_key(self, key: str) -> PeerInfo | None: return self._peers.get(key) def get_by_location(self, location: str) ‑> PeerInfo | None-
Expand source code
def get_by_location(self, location: str) -> PeerInfo | None: return self._peers.get(location) def get_by_nick(self, nick: str) ‑> PeerInfo | None-
Expand source code
def get_by_nick(self, nick: str) -> PeerInfo | None: key = self._nick_to_key.get(nick) if key: return self._peers.get(key) return None def get_neutrino_compat_peers(self,
network: NetworkType | None = None) ‑> list[PeerInfo]-
Expand source code
def get_neutrino_compat_peers(self, network: NetworkType | None = None) -> list[PeerInfo]: """ Get peers that support neutrino_compat feature. These peers advertise extended UTXO metadata (scriptpubkey, blockheight) which is required for Neutrino backend verification. """ return [p for p in self._iter_connected(network) if p.neutrino_compat]Get peers that support neutrino_compat feature.
These peers advertise extended UTXO metadata (scriptpubkey, blockheight) which is required for Neutrino backend verification.
def get_passive_peers(self,
network: NetworkType | None = None) ‑> list[PeerInfo]-
Expand source code
def get_passive_peers(self, network: NetworkType | None = None) -> list[PeerInfo]: """ Get passive peers (NOT-SERVING-ONION). These are typically orderbook watchers/takers that don't host their own onion service but connect to the directory to watch offers. """ return [p for p in self._iter_connected(network) if p.onion_address == "NOT-SERVING-ONION"]Get passive peers (NOT-SERVING-ONION).
These are typically orderbook watchers/takers that don't host their own onion service but connect to the directory to watch offers.
def get_peerlist_for_network(self,
network: NetworkType) ‑> list[tuple[str, str]]-
Expand source code
def get_peerlist_for_network(self, network: NetworkType) -> list[tuple[str, str]]: # Use generator to avoid intermediate list # Include all connected peers, even NOT-SERVING-ONION # While they can't be directly connected to, they are reachable via the directory # for private messages, so this information is useful return [(peer.nick, peer.location_string) for peer in self._iter_connected(network)] def get_peerlist_with_features(self,
network: NetworkType) ‑> list[tuple[str, str, FeatureSet]]-
Expand source code
def get_peerlist_with_features(self, network: NetworkType) -> list[tuple[str, str, FeatureSet]]: """ Get peerlist with features for peers on a network. Returns list of (nick, location, features) tuples for connected peers. Includes all peers, even NOT-SERVING-ONION, as they are still reachable via the directory for private messaging. """ result = [] for peer in self._iter_connected(network): # Build FeatureSet from peer.features dict features = FeatureSet(features={k for k, v in peer.features.items() if v is True}) result.append((peer.nick, peer.location_string, features)) return resultGet peerlist with features for peers on a network.
Returns list of (nick, location, features) tuples for connected peers. Includes all peers, even NOT-SERVING-ONION, as they are still reachable via the directory for private messaging.
def get_stats(self) ‑> dict[str, int]-
Expand source code
def get_stats(self) -> dict[str, int]: connected = 0 passive = 0 active = 0 neutrino_compat = 0 peerlist_features = 0 push_encrypted = 0 for p in list(self._peers.values()): if p.status == PeerStatus.HANDSHAKED and not p.is_directory: connected += 1 if p.onion_address == "NOT-SERVING-ONION": passive += 1 else: active += 1 # Count feature support from features dict features = p.features if features.get("neutrino_compat"): neutrino_compat += 1 if features.get("peerlist_features"): peerlist_features += 1 if features.get("push_encrypted"): push_encrypted += 1 return { "total_peers": len(self._peers), "connected_peers": connected, "passive_peers": passive, "active_peers": active, "neutrino_compat_peers": neutrino_compat, "peerlist_features_peers": peerlist_features, "push_encrypted_peers": push_encrypted, } def iter_connected(self,
network: NetworkType | None = None) ‑> Iterator[PeerInfo]-
Expand source code
def iter_connected(self, network: NetworkType | None = None) -> Iterator[PeerInfo]: """Public memory-efficient iterator over connected peers.""" return self._iter_connected(network)Public memory-efficient iterator over connected peers.
def register(self,
peer: PeerInfo) ‑> None-
Expand source code
def register(self, peer: PeerInfo) -> None: if len(self._peers) >= self.max_peers: raise ValueError(f"Maximum peers reached: {self.max_peers}") location = peer.location_string key = peer.nick if location == "NOT-SERVING-ONION" else location self._peers[key] = peer if peer.nick: self._nick_to_key[peer.nick] = key peer.last_seen = datetime.now(UTC) logger.info(f"Registered peer: {peer.nick} at {location}") def unregister(self, key: str) ‑> None-
Expand source code
def unregister(self, key: str) -> None: if key not in self._peers: return peer = self._peers[key] if peer.nick in self._nick_to_key: del self._nick_to_key[peer.nick] del self._peers[key] logger.info(f"Unregistered peer: {peer.nick} at {peer.location_string}") def update_status(self,
key: str,
status: PeerStatus) ‑> None-
Expand source code
def update_status(self, key: str, status: PeerStatus) -> None: peer = self.get_by_key(key) if peer: peer.status = status if status in (PeerStatus.CONNECTED, PeerStatus.HANDSHAKED): peer.last_seen = datetime.now(UTC)