#!/usr/bin/env python3 """Mock WebSocket server that sends randomized Pi stats every 2 seconds.""" import asyncio import json import random import time import websockets def generate_stats(): services = [ {"name": "docker", "status": random.choice(["running", "running", "running", "stopped"])}, {"name": "pihole", "status": random.choice(["running", "running", "running", "stopped"])}, {"name": "nginx", "status": random.choice(["running", "running", "stopped"])}, {"name": "sshd", "status": "running"}, ] return { "cpu_pct": round(random.uniform(5, 95), 1), "mem_pct": round(random.uniform(30, 85), 1), "mem_used_mb": random.randint(512, 3200), "disk_pct": round(random.uniform(20, 80), 1), "cpu_temp": round(random.uniform(35, 78), 1), "uptime_hrs": round(random.uniform(1, 2000), 1), "net_rx_kbps": round(random.uniform(0, 5000), 1), "net_tx_kbps": round(random.uniform(0, 2000), 1), "services": services, "timestamp": int(time.time()), } async def handler(websocket): addr = websocket.remote_address print(f"Client connected: {addr}") try: while True: stats = generate_stats() await websocket.send(json.dumps(stats)) await asyncio.sleep(2) except websockets.ConnectionClosed: print(f"Client disconnected: {addr}") async def main(): print("Mock Pi stats server starting on ws://0.0.0.0:8765") async with websockets.serve(handler, "0.0.0.0", 8765): await asyncio.Future() # run forever if __name__ == "__main__": asyncio.run(main())