""" Alarm audio streaming test server. Streams a WAV file as raw PCM chunks over WebSocket on port 8766. Repeats every 30-60 seconds to exercise the ESP32 audio pipeline. Protocol: 1. Text frame: {"type":"alarm_start","sample_rate":N,"channels":N,"bits":N} 2. Binary frames: raw PCM chunks (4096 bytes each, paced at ~90% real-time) 3. Text frame: {"type":"alarm_stop"} """ import asyncio import json import logging import struct import wave from pathlib import Path from random import randint import websockets from PIL import Image logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s") log = logging.getLogger("audio_server") PORT = 8766 CHUNK_SIZE = 4096 AUDIO_DIR = Path(__file__).parent / "assets" / "alarm" IMG_DIR = Path(__file__).parent / "assets" / "img" STATUS_IMG_SIZE = 120 def find_wav() -> Path: """Find the first .wav file in the alarm assets directory.""" wavs = list(AUDIO_DIR.glob("*.wav")) if not wavs: raise FileNotFoundError(f"No .wav files found in {AUDIO_DIR}") log.info("Using audio file: %s", wavs[0].name) return wavs[0] def read_wav(path: Path) -> tuple[bytes, int, int, int]: """Read WAV file and return (pcm_data, sample_rate, channels, bits_per_sample).""" with wave.open(str(path), "rb") as wf: sr = wf.getframerate() ch = wf.getnchannels() bits = wf.getsampwidth() * 8 pcm = wf.readframes(wf.getnframes()) log.info("WAV loaded: %dHz %dch %dbit, %.1fs, %d bytes", sr, ch, bits, len(pcm) / (sr * ch * (bits // 8)), len(pcm)) return pcm, sr, ch, bits def load_status_image(path: Path) -> bytes: """Load a PNG, convert to 1-bit 120x120 monochrome bitmap (MSB-first, black=1).""" img = Image.open(path).convert("L") # Resize to fit within 120x120, preserving aspect ratio img.thumbnail((STATUS_IMG_SIZE, STATUS_IMG_SIZE), Image.LANCZOS) # Paste centered onto white canvas canvas = Image.new("L", (STATUS_IMG_SIZE, STATUS_IMG_SIZE), 255) x_off = (STATUS_IMG_SIZE - img.width) // 2 y_off = (STATUS_IMG_SIZE - img.height) // 2 canvas.paste(img, (x_off, y_off)) # Threshold to 1-bit: black (< 128) → 1, white → 0 bw = canvas.point(lambda p: 1 if p < 128 else 0, "1") raw = bw.tobytes() log.info("Status image loaded: %s → %d bytes", path.name, len(raw)) return raw async def send_status_image(ws, img_bytes: bytes): """Send a status image over the WebSocket (text header + binary payload).""" header = json.dumps({"type": "status_image", "width": STATUS_IMG_SIZE, "height": STATUS_IMG_SIZE}) await ws.send(header) await ws.send(img_bytes) log.info("Sent status image (%d bytes)", len(img_bytes)) def chunk_bytes(data: bytes, size: int): """Yield data in fixed-size chunks.""" for i in range(0, len(data), size): yield data[i : i + size] async def stream_alarm(ws, pcm: bytes, sr: int, ch: int, bits: int): """Stream one alarm cycle to the connected client.""" # Compute pacing: how long each chunk represents in seconds bytes_per_sec = sr * ch * (bits // 8) chunk_duration = CHUNK_SIZE / bytes_per_sec pace_delay = chunk_duration * 0.9 # 90% real-time to avoid underrun total_chunks = (len(pcm) + CHUNK_SIZE - 1) // CHUNK_SIZE # Start start_msg = json.dumps({ "type": "alarm_start", "sample_rate": sr, "channels": ch, "bits": bits, }) await ws.send(start_msg) log.info("Sent alarm_start (%d chunks, pace %.1fms)", total_chunks, pace_delay * 1000) # Stream PCM chunks for i, chunk in enumerate(chunk_bytes(pcm, CHUNK_SIZE)): await ws.send(chunk) # bytes → binary frame await asyncio.sleep(pace_delay) # Stop await ws.send(json.dumps({"type": "alarm_stop"})) log.info("Sent alarm_stop") async def handler(ws): """Handle a single WebSocket connection.""" remote = ws.remote_address log.info("Client connected: %s:%d", remote[0], remote[1]) wav_path = find_wav() pcm, sr, ch, bits = read_wav(wav_path) # Load status images img_idle = load_status_image(IMG_DIR / "idle.png") img_alarm = load_status_image(IMG_DIR / "on_alarm.png") try: # Send idle image on connect await send_status_image(ws, img_idle) while True: delay = randint(30, 60) log.info("Next alarm in %ds", delay) await asyncio.sleep(delay) # Switch to alarm image before audio await send_status_image(ws, img_alarm) await stream_alarm(ws, pcm, sr, ch, bits) # Switch back to idle after alarm await send_status_image(ws, img_idle) except websockets.exceptions.ConnectionClosed: log.info("Client disconnected: %s:%d", remote[0], remote[1]) async def main(): log.info("Audio server starting on port %d", PORT) async with websockets.serve(handler, "0.0.0.0", PORT): await asyncio.Future() # run forever if __name__ == "__main__": main_loop = asyncio.run(main())