Module jmcore.network
Network primitives and connection management.
Functions
async def connect_direct(host: str, port: int, max_message_size: int = 2097152, timeout: float = 30.0) ‑> TCPConnection-
Expand source code
async def connect_direct( host: str, port: int, max_message_size: int = 2097152, # 2MB timeout: float = 30.0, ) -> TCPConnection: """Connect directly via TCP without Tor (for local development/testing).""" try: logger.info(f"Connecting directly to {host}:{port}") reader, writer = await asyncio.wait_for( asyncio.open_connection(host, port, limit=max_message_size), timeout=timeout, ) logger.info(f"Connected to {host}:{port}") return TCPConnection(reader, writer, max_message_size) except Exception as e: logger.error(f"Failed to connect to {host}:{port}: {e}") raise ConnectionError(f"Direct connection failed: {e}") from eConnect directly via TCP without Tor (for local development/testing).
async def connect_via_tor(onion_address: str,
port: int,
socks_host: str = '127.0.0.1',
socks_port: int = 9050,
max_message_size: int = 2097152,
timeout: float = 30.0) ‑> TCPConnection-
Expand source code
async def connect_via_tor( onion_address: str, port: int, socks_host: str = "127.0.0.1", socks_port: int = 9050, max_message_size: int = 2097152, # 2MB timeout: float = 30.0, ) -> TCPConnection: try: import socket import socks sock = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) sock.set_proxy(socks.SOCKS5, socks_host, socks_port) sock.settimeout(timeout) logger.info(f"Connecting to {onion_address}:{port} via Tor ({socks_host}:{socks_port})") await asyncio.get_event_loop().run_in_executor(None, sock.connect, (onion_address, port)) sock.setblocking(False) reader, writer = await asyncio.open_connection(sock=sock, limit=max_message_size) logger.info(f"Connected to {onion_address}:{port}") return TCPConnection(reader, writer, max_message_size) except Exception as e: logger.error(f"Failed to connect to {onion_address}:{port} via Tor: {e}") raise ConnectionError(f"Tor connection failed: {e}") from e
Classes
class Connection-
Expand source code
class Connection(ABC): @abstractmethod async def send(self, data: bytes) -> None: pass @abstractmethod async def receive(self) -> bytes: pass @abstractmethod async def close(self) -> None: pass @abstractmethod def is_connected(self) -> bool: passHelper class that provides a standard way to create an ABC using inheritance.
Ancestors
- abc.ABC
Subclasses
Methods
async def close(self) ‑> None-
Expand source code
@abstractmethod async def close(self) -> None: pass def is_connected(self) ‑> bool-
Expand source code
@abstractmethod def is_connected(self) -> bool: pass async def receive(self) ‑> bytes-
Expand source code
@abstractmethod async def receive(self) -> bytes: pass async def send(self, data: bytes) ‑> None-
Expand source code
@abstractmethod async def send(self, data: bytes) -> None: pass
class ConnectionError (*args, **kwargs)-
Expand source code
class ConnectionError(Exception): passCommon base class for all non-exit exceptions.
Ancestors
- builtins.Exception
- builtins.BaseException
class ConnectionPool (max_connections: int = 1000)-
Expand source code
class ConnectionPool: def __init__(self, max_connections: int = 1000): self.max_connections = max_connections self.connections: dict[str, Connection] = {} def add(self, peer_id: str, connection: Connection) -> None: if len(self.connections) >= self.max_connections: raise ConnectionError(f"Connection pool full ({self.max_connections})") self.connections[peer_id] = connection def get(self, peer_id: str) -> Connection | None: return self.connections.get(peer_id) def remove(self, peer_id: str) -> None: if peer_id in self.connections: del self.connections[peer_id] async def close_all(self) -> None: connections_snapshot = list(self.connections.values()) for conn in connections_snapshot: await conn.close() self.connections.clear() def __len__(self) -> int: return len(self.connections)Methods
def add(self,
peer_id: str,
connection: Connection) ‑> None-
Expand source code
def add(self, peer_id: str, connection: Connection) -> None: if len(self.connections) >= self.max_connections: raise ConnectionError(f"Connection pool full ({self.max_connections})") self.connections[peer_id] = connection async def close_all(self) ‑> None-
Expand source code
async def close_all(self) -> None: connections_snapshot = list(self.connections.values()) for conn in connections_snapshot: await conn.close() self.connections.clear() def get(self, peer_id: str) ‑> Connection | None-
Expand source code
def get(self, peer_id: str) -> Connection | None: return self.connections.get(peer_id) def remove(self, peer_id: str) ‑> None-
Expand source code
def remove(self, peer_id: str) -> None: if peer_id in self.connections: del self.connections[peer_id]
class HiddenServiceListener (host: str = '127.0.0.1',
port: int = 0,
max_message_size: int = 2097152,
on_connection: Callable[[TCPConnection, str], Coroutine[Any, Any, None]] | None = None)-
Expand source code
class HiddenServiceListener: """ TCP listener for accepting direct peer connections via Tor hidden service. This is used by makers to accept direct connections from takers, bypassing the directory server for lower latency. """ def __init__( self, host: str = "127.0.0.1", port: int = 0, max_message_size: int = 2097152, on_connection: Callable[[TCPConnection, str], Coroutine[Any, Any, None]] | None = None, ): """ Initialize hidden service listener. Args: host: Local address to bind to (typically 127.0.0.1 for Tor) port: Local port to bind to (0 for auto-assign) max_message_size: Maximum message size in bytes on_connection: Callback when new connection is accepted """ self.host = host self.port = port self.max_message_size = max_message_size self.on_connection = on_connection self.server: asyncio.Server | None = None # type: ignore[no-any-unimported] self.running = False self._bound_port: int = 0 @property def bound_port(self) -> int: """Get the actual port the server is bound to.""" return self._bound_port async def start(self) -> int: """ Start listening for connections. Returns: The port number the server is bound to """ self.server = await asyncio.start_server( self._handle_connection, self.host, self.port, limit=self.max_message_size, ) self.running = True # Get the actual bound port addrs = self.server.sockets[0].getsockname() if self.server.sockets else None if addrs: self._bound_port = addrs[1] else: self._bound_port = self.port logger.info(f"Hidden service listener started on {self.host}:{self._bound_port}") return self._bound_port async def _handle_connection( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: """Handle incoming connection.""" peer_addr = writer.get_extra_info("peername") peer_str = f"{peer_addr[0]}:{peer_addr[1]}" if peer_addr else "unknown" logger.info(f"Accepted connection from {peer_str}") connection = TCPConnection(reader, writer, self.max_message_size) if self.on_connection: try: await self.on_connection(connection, peer_str) except Exception as e: logger.error(f"Error handling connection from {peer_str}: {e}") await connection.close() async def stop(self) -> None: """Stop the listener.""" self.running = False if self.server: self.server.close() await self.server.wait_closed() self.server = None logger.info("Hidden service listener stopped") async def serve_forever(self) -> None: """Run the server until stopped.""" if self.server: await self.server.serve_forever()TCP listener for accepting direct peer connections via Tor hidden service.
This is used by makers to accept direct connections from takers, bypassing the directory server for lower latency.
Initialize hidden service listener.
Args
host- Local address to bind to (typically 127.0.0.1 for Tor)
port- Local port to bind to (0 for auto-assign)
max_message_size- Maximum message size in bytes
on_connection- Callback when new connection is accepted
Instance variables
prop bound_port : int-
Expand source code
@property def bound_port(self) -> int: """Get the actual port the server is bound to.""" return self._bound_portGet the actual port the server is bound to.
Methods
async def serve_forever(self) ‑> None-
Expand source code
async def serve_forever(self) -> None: """Run the server until stopped.""" if self.server: await self.server.serve_forever()Run the server until stopped.
async def start(self) ‑> int-
Expand source code
async def start(self) -> int: """ Start listening for connections. Returns: The port number the server is bound to """ self.server = await asyncio.start_server( self._handle_connection, self.host, self.port, limit=self.max_message_size, ) self.running = True # Get the actual bound port addrs = self.server.sockets[0].getsockname() if self.server.sockets else None if addrs: self._bound_port = addrs[1] else: self._bound_port = self.port logger.info(f"Hidden service listener started on {self.host}:{self._bound_port}") return self._bound_portStart listening for connections.
Returns
The port number the server is bound to
async def stop(self) ‑> None-
Expand source code
async def stop(self) -> None: """Stop the listener.""" self.running = False if self.server: self.server.close() await self.server.wait_closed() self.server = None logger.info("Hidden service listener stopped")Stop the listener.
class TCPConnection (reader: asyncio.StreamReader,
writer: asyncio.StreamWriter,
max_message_size: int = 2097152)-
Expand source code
class TCPConnection(Connection): def __init__( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, max_message_size: int = 2097152, # 2MB ): self.reader = reader self.writer = writer self.max_message_size = max_message_size self._connected = True self._send_lock = asyncio.Lock() async def send(self, data: bytes) -> None: if not self._connected: raise ConnectionError("Connection closed") if len(data) > self.max_message_size: raise ValueError(f"Message too large: {len(data)} > {self.max_message_size}") async with self._send_lock: if not self._connected: raise ConnectionError("Connection closed") message_to_send = data + b"\r\n" logger.trace(f"TCPConnection.send: sending {len(message_to_send)} bytes") try: self.writer.write(message_to_send) await self.writer.drain() except (BrokenPipeError, ConnectionResetError, OSError) as e: self._connected = False raise ConnectionError(f"Send failed: {e}") from e async def receive(self) -> bytes: if not self._connected: raise ConnectionError("Connection closed") try: data = await self.reader.readuntil(b"\n") stripped = data.rstrip(b"\r\n") logger.trace(f"TCPConnection.receive: received {len(stripped)} bytes") return stripped except asyncio.LimitOverrunError as e: logger.error(f"Message too large (>{self.max_message_size} bytes)") raise ConnectionError("Message too large") from e except asyncio.IncompleteReadError as e: self._connected = False logger.trace("TCPConnection.receive: connection closed by peer") raise ConnectionError("Connection closed by peer") from e async def close(self) -> None: if not self._connected: return self._connected = False self.writer.close() await self.writer.wait_closed() def is_connected(self) -> bool: return self._connectedHelper class that provides a standard way to create an ABC using inheritance.
Ancestors
- Connection
- abc.ABC
Methods
async def close(self) ‑> None-
Expand source code
async def close(self) -> None: if not self._connected: return self._connected = False self.writer.close() await self.writer.wait_closed() def is_connected(self) ‑> bool-
Expand source code
def is_connected(self) -> bool: return self._connected async def receive(self) ‑> bytes-
Expand source code
async def receive(self) -> bytes: if not self._connected: raise ConnectionError("Connection closed") try: data = await self.reader.readuntil(b"\n") stripped = data.rstrip(b"\r\n") logger.trace(f"TCPConnection.receive: received {len(stripped)} bytes") return stripped except asyncio.LimitOverrunError as e: logger.error(f"Message too large (>{self.max_message_size} bytes)") raise ConnectionError("Message too large") from e except asyncio.IncompleteReadError as e: self._connected = False logger.trace("TCPConnection.receive: connection closed by peer") raise ConnectionError("Connection closed by peer") from e async def send(self, data: bytes) ‑> None-
Expand source code
async def send(self, data: bytes) -> None: if not self._connected: raise ConnectionError("Connection closed") if len(data) > self.max_message_size: raise ValueError(f"Message too large: {len(data)} > {self.max_message_size}") async with self._send_lock: if not self._connected: raise ConnectionError("Connection closed") message_to_send = data + b"\r\n" logger.trace(f"TCPConnection.send: sending {len(message_to_send)} bytes") try: self.writer.write(message_to_send) await self.writer.drain() except (BrokenPipeError, ConnectionResetError, OSError) as e: self._connected = False raise ConnectionError(f"Send failed: {e}") from e