Module directory_server.cli

CLI commands for directory server management.

Functions

def format_status_output(stats: dict) ‑> str
Expand source code
def format_status_output(stats: dict) -> str:
    lines = [
        "=== Directory Server Status ===",
        f"Network: {stats['network']}",
        f"Uptime: {stats['uptime_seconds']:.0f}s ({stats['uptime_seconds'] / 3600:.1f}h)",
        f"Status: {stats['server_status']}",
        f"Connected peers: {stats['connected_peers']['total']}/{stats['max_peers']}",
    ]

    if stats["connected_peers"]["nicks"]:
        lines.append(f"  Nicks: {', '.join(stats['connected_peers']['nicks'][:20])}")
        if len(stats["connected_peers"]["nicks"]) > 20:
            remaining = len(stats["connected_peers"]["nicks"]) - 20
            lines.append(f"  ... and {remaining} more")

    lines.extend(
        [
            f"Passive peers (orderbook watchers): {stats['passive_peers']['total']}",
        ]
    )

    if stats["passive_peers"]["nicks"]:
        lines.append(f"  Nicks: {', '.join(stats['passive_peers']['nicks'][:20])}")
        if len(stats["passive_peers"]["nicks"]) > 20:
            remaining = len(stats["passive_peers"]["nicks"]) - 20
            lines.append(f"  ... and {remaining} more")

    lines.extend(
        [
            f"Active peers (makers): {stats['active_peers']['total']}",
        ]
    )

    if stats["active_peers"]["nicks"]:
        lines.append(f"  Nicks: {', '.join(stats['active_peers']['nicks'][:20])}")
        if len(stats["active_peers"]["nicks"]) > 20:
            remaining = len(stats["active_peers"]["nicks"]) - 20
            lines.append(f"  ... and {remaining} more")

    lines.extend(
        [
            f"Active connections: {stats['active_connections']}",
        ]
    )

    # Add offer stats if available
    if "offers" in stats:
        offer_stats = stats["offers"]
        lines.extend(
            [
                "",
                "Offers:",
                f"  Total offers: {offer_stats['total_offers']}",
                f"  Peers with offers: {offer_stats['peers_with_offers']}",
            ]
        )

        # Show peers with more than 2 offers
        if offer_stats.get("peers_many_offers"):
            peers_many = offer_stats["peers_many_offers"]
            if peers_many:
                lines.append("  Peers with >2 offers:")
                for nick, count in peers_many:
                    lines.append(f"    {nick}: {count} offers")

    # Add rate limiter stats if available
    if "rate_limiter" in stats:
        rl_stats = stats["rate_limiter"]
        lines.extend(
            [
                "",
                "Rate Limiting:",
                f"  Tracked connections: {rl_stats['tracked_peers']}",
                f"  Total violations: {rl_stats['total_violations']}",
            ]
        )

        # Show top violators if any
        if rl_stats.get("top_violators"):
            top_violators = rl_stats["top_violators"][:5]  # Show top 5
            if top_violators:
                lines.append("  Top violators (by connection):")
                for conn_id, count in top_violators:
                    # Connection IDs are IP:port format - display as-is
                    lines.append(f"    {conn_id}: {count} violations")

    lines.append("===============================")

    return "\n".join(lines)
def health_command(args: argparse.Namespace) ‑> int
Expand source code
def health_command(args: argparse.Namespace) -> int:
    url = f"http://{args.host}:{args.port}/health"

    try:
        with urlopen(url, timeout=5) as response:
            data = json.loads(response.read().decode())

            if args.json:
                print(json.dumps(data, indent=2))
            else:
                status = data.get("status", "unknown")
                print(f"Server status: {status}")

            return 0 if data.get("status") == "healthy" else 1

    except URLError:
        print("Error: Server unhealthy or unreachable", file=sys.stderr)
        return 1
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        return 1
def main() ‑> None
Expand source code
def main() -> None:
    parser = argparse.ArgumentParser(description="JoinMarket Directory Server CLI")
    parser.add_argument(
        "--host", default="127.0.0.1", help="Health check server host (default: 127.0.0.1)"
    )
    parser.add_argument(
        "--port", type=int, default=8080, help="Health check server port (default: 8080)"
    )

    subparsers = parser.add_subparsers(dest="command", help="Available commands")

    status_parser = subparsers.add_parser("status", help="Get server status")
    status_parser.add_argument("--json", action="store_true", help="Output as JSON")
    status_parser.set_defaults(func=status_command)

    health_parser = subparsers.add_parser("health", help="Check server health")
    health_parser.add_argument("--json", action="store_true", help="Output as JSON")
    health_parser.set_defaults(func=health_command)

    args = parser.parse_args()

    if not args.command:
        parser.print_help()
        sys.exit(1)

    sys.exit(args.func(args))
def status_command(args: argparse.Namespace) ‑> int
Expand source code
def status_command(args: argparse.Namespace) -> int:
    url = f"http://{args.host}:{args.port}/status"

    try:
        with urlopen(url, timeout=5) as response:
            data = json.loads(response.read().decode())

            if args.json:
                print(json.dumps(data, indent=2))
            else:
                print(format_status_output(data))

            return 0

    except URLError as e:
        print(f"Error: Could not connect to server at {url}", file=sys.stderr)
        print(f"Details: {e}", file=sys.stderr)
        return 1
    except json.JSONDecodeError as e:
        print("Error: Invalid JSON response from server", file=sys.stderr)
        print(f"Details: {e}", file=sys.stderr)
        return 1
    except Exception as e:
        print(f"Error: {e}", file=sys.stderr)
        return 1