#!/usr/bin/env python3 """WebSocket server that sends real Pi system stats every 2 seconds. Drop-in replacement for mock_server.py. Same port (8765), same JSON schema, same 2s push interval. Services remain mocked until systemd integration is added. """ import asyncio import json import random import time from datetime import datetime from pathlib import Path import psutil import websockets # Prime the CPU percent counter (first call always returns 0.0) psutil.cpu_percent(interval=None) # Network baseline for delta calculation _prev_net = psutil.net_io_counters() _prev_net_time = time.monotonic() def _get_cpu_temp() -> float: """Read CPU temperature with fallback for different Pi OS versions.""" try: temps = psutil.sensors_temperatures() if "cpu_thermal" in temps and temps["cpu_thermal"]: return round(temps["cpu_thermal"][0].current, 1) except (AttributeError, KeyError): pass # Fallback: read sysfs directly (value is in millidegrees) thermal_path = Path("/sys/class/thermal/thermal_zone0/temp") try: millidegrees = int(thermal_path.read_text().strip()) return round(millidegrees / 1000.0, 1) except (FileNotFoundError, ValueError, PermissionError): return 0.0 def _get_net_throughput() -> tuple[float, float]: """Calculate network rx/tx in kbps since last call.""" global _prev_net, _prev_net_time now = time.monotonic() current = psutil.net_io_counters() elapsed = now - _prev_net_time if elapsed <= 0: return 0.0, 0.0 rx_kbps = round((current.bytes_recv - _prev_net.bytes_recv) * 8 / (elapsed * 1000), 1) tx_kbps = round((current.bytes_sent - _prev_net.bytes_sent) * 8 / (elapsed * 1000), 1) _prev_net = current _prev_net_time = now return rx_kbps, tx_kbps def _mock_services() -> list[dict]: """Mocked service status — same logic as mock_server.py.""" return [ {"name": "docker", "status": random.choice(["running", "running", "running", "stopped"])}, {"name": "pihole", "status": random.choice(["running", "running", "running", "stopped"])}, {"name": "nginx", "status": random.choice(["running", "running", "stopped"])}, {"name": "sshd", "status": "running"}, ] def _local_time_fields() -> dict: """Current local time as broken-down fields for RTC sync.""" now = datetime.now() return { "y": now.year, "mo": now.month, "d": now.day, "h": now.hour, "m": now.minute, "s": now.second, } def generate_stats() -> dict: mem = psutil.virtual_memory() disk = psutil.disk_usage("/") rx_kbps, tx_kbps = _get_net_throughput() return { "cpu_pct": psutil.cpu_percent(interval=None), "mem_pct": round(mem.percent, 1), "mem_used_mb": int(mem.used // (1024 * 1024)), "disk_pct": round(disk.percent, 1), "cpu_temp": _get_cpu_temp(), "uptime_hrs": round((time.time() - psutil.boot_time()) / 3600, 1), "net_rx_kbps": rx_kbps, "net_tx_kbps": tx_kbps, "services": _mock_services(), "timestamp": int(time.time()), "local_time": _local_time_fields(), } async def handler(websocket): addr = websocket.remote_address print(f"Client connected: {addr}") try: while True: stats = generate_stats() await websocket.send(json.dumps(stats)) await asyncio.sleep(2) except websockets.ConnectionClosed: print(f"Client disconnected: {addr}") async def main(): print("Pi stats server starting on ws://0.0.0.0:8765") async with websockets.serve(handler, "0.0.0.0", 8765): await asyncio.Future() # run forever if __name__ == "__main__": asyncio.run(main())