53 lines
1.6 KiB
Python
53 lines
1.6 KiB
Python
|
|
#!/usr/bin/env python3
|
||
|
|
"""Mock WebSocket server that sends randomized Pi stats every 2 seconds."""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import json
|
||
|
|
import random
|
||
|
|
import time
|
||
|
|
|
||
|
|
import websockets
|
||
|
|
|
||
|
|
|
||
|
|
def generate_stats():
|
||
|
|
services = [
|
||
|
|
{"name": "docker", "status": random.choice(["running", "running", "running", "stopped"])},
|
||
|
|
{"name": "pihole", "status": random.choice(["running", "running", "running", "stopped"])},
|
||
|
|
{"name": "nginx", "status": random.choice(["running", "running", "stopped"])},
|
||
|
|
{"name": "sshd", "status": "running"},
|
||
|
|
]
|
||
|
|
return {
|
||
|
|
"cpu_pct": round(random.uniform(5, 95), 1),
|
||
|
|
"mem_pct": round(random.uniform(30, 85), 1),
|
||
|
|
"mem_used_mb": random.randint(512, 3200),
|
||
|
|
"disk_pct": round(random.uniform(20, 80), 1),
|
||
|
|
"cpu_temp": round(random.uniform(35, 78), 1),
|
||
|
|
"uptime_hrs": round(random.uniform(1, 2000), 1),
|
||
|
|
"net_rx_kbps": round(random.uniform(0, 5000), 1),
|
||
|
|
"net_tx_kbps": round(random.uniform(0, 2000), 1),
|
||
|
|
"services": services,
|
||
|
|
"timestamp": int(time.time()),
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
async def handler(websocket):
|
||
|
|
addr = websocket.remote_address
|
||
|
|
print(f"Client connected: {addr}")
|
||
|
|
try:
|
||
|
|
while True:
|
||
|
|
stats = generate_stats()
|
||
|
|
await websocket.send(json.dumps(stats))
|
||
|
|
await asyncio.sleep(2)
|
||
|
|
except websockets.ConnectionClosed:
|
||
|
|
print(f"Client disconnected: {addr}")
|
||
|
|
|
||
|
|
|
||
|
|
async def main():
|
||
|
|
print("Mock Pi stats server starting on ws://0.0.0.0:8765")
|
||
|
|
async with websockets.serve(handler, "0.0.0.0", 8765):
|
||
|
|
await asyncio.Future() # run forever
|
||
|
|
|
||
|
|
|
||
|
|
if __name__ == "__main__":
|
||
|
|
asyncio.run(main())
|