"""Audio alarm functions — WAV loading and PCM streaming.""" import array import asyncio import json import logging import math import wave from pathlib import Path log = logging.getLogger(__name__) CHUNK_SIZE = 4096 AUDIO_DIR = Path(__file__).parent / "assets" / "alarm" def find_wav(path: Path | None = None) -> Path: """Return a WAV file path. If *path* is given and points to an existing file, use it directly. Otherwise fall back to the first .wav found in the alarm assets directory. """ if path is not None: p = Path(path) if p.is_file(): log.info("Using audio file: %s", p) return p log.warning("Specified audio path not found: %s — falling back to glob", p) wavs = list(AUDIO_DIR.glob("*.wav")) if not wavs: raise FileNotFoundError(f"No .wav files found in {AUDIO_DIR}") log.info("Using audio file: %s", wavs[0].name) return wavs[0] def _normalize_pcm(pcm: bytes, bits: int) -> bytes: """Peak-normalize PCM data to 0 dBFS. Supports 8-bit (unsigned) and 16-bit (signed) PCM. Returns the original bytes unchanged if already at 0 dB or silent. """ if bits == 16: samples = array.array("h", pcm) # signed 16-bit peak = max(abs(s) for s in samples) if samples else 0 if peak == 0 or peak == 32767: return pcm scale = 32767 / peak samples = array.array("h", (min(32767, max(-32768, int(s * scale))) for s in samples)) elif bits == 8: samples = array.array("B", pcm) # unsigned 8-bit, center=128 peak = max(abs(s - 128) for s in samples) if samples else 0 if peak == 0 or peak == 127: return pcm scale = 127 / peak samples = array.array("B", (max(0, min(255, int((s - 128) * scale) + 128)) for s in samples)) else: log.warning("Normalization not supported for %d-bit audio, skipping", bits) return pcm gain_db = 20 * __import__("math").log10(scale) if scale > 0 else 0 log.info("Normalized: peak %d → 0 dBFS (gain %.1f dB)", peak, gain_db) return samples.tobytes() def read_wav(path: Path) -> tuple[bytes, int, int, int]: """Read WAV file, normalize to 0 dBFS, return (pcm_data, sample_rate, channels, bits).""" try: wf = wave.open(str(path), "rb") except wave.Error as e: raise ValueError( f"{path.name}: unsupported WAV format ({e}). " "Only 8/16-bit integer PCM is supported — no 32-bit float." ) from e with wf: sr = wf.getframerate() ch = wf.getnchannels() bits = wf.getsampwidth() * 8 pcm = wf.readframes(wf.getnframes()) log.info("WAV loaded: %dHz %dch %dbit, %.1fs, %d bytes", sr, ch, bits, len(pcm) / (sr * ch * (bits // 8)), len(pcm)) pcm = _normalize_pcm(pcm, bits) return pcm, sr, ch, bits def chunk_bytes(data: bytes, size: int): """Yield data in fixed-size chunks.""" for i in range(0, len(data), size): yield data[i : i + size] async def stream_alarm(ws, pcm: bytes, sr: int, ch: int, bits: int): """Stream one alarm cycle to the connected client.""" bytes_per_sec = sr * ch * (bits // 8) chunk_duration = CHUNK_SIZE / bytes_per_sec pace_delay = chunk_duration * 0.9 # 90% real-time to avoid underrun total_chunks = (len(pcm) + CHUNK_SIZE - 1) // CHUNK_SIZE start_msg = json.dumps({ "type": "alarm_start", "sample_rate": sr, "channels": ch, "bits": bits, }) await ws.send(start_msg) log.info("Sent alarm_start (%d chunks, pace %.1fms)", total_chunks, pace_delay * 1000) for i, chunk in enumerate(chunk_bytes(pcm, CHUNK_SIZE)): await ws.send(chunk) await asyncio.sleep(pace_delay) await ws.send(json.dumps({"type": "alarm_stop"})) log.info("Sent alarm_stop")