115 lines
3.8 KiB
Python
115 lines
3.8 KiB
Python
"""Audio alarm functions — WAV loading and PCM streaming."""
|
|
|
|
import array
|
|
import asyncio
|
|
import json
|
|
import logging
|
|
import math
|
|
import wave
|
|
from pathlib import Path
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
CHUNK_SIZE = 4096
|
|
AUDIO_DIR = Path(__file__).parent / "assets" / "alarm"
|
|
|
|
|
|
def find_wav(path: Path | None = None) -> Path:
|
|
"""Return a WAV file path.
|
|
|
|
If *path* is given and points to an existing file, use it directly.
|
|
Otherwise fall back to the first .wav found in the alarm assets directory.
|
|
"""
|
|
if path is not None:
|
|
p = Path(path)
|
|
if p.is_file():
|
|
log.info("Using audio file: %s", p)
|
|
return p
|
|
log.warning("Specified audio path not found: %s — falling back to glob", p)
|
|
|
|
wavs = list(AUDIO_DIR.glob("*.wav"))
|
|
if not wavs:
|
|
raise FileNotFoundError(f"No .wav files found in {AUDIO_DIR}")
|
|
log.info("Using audio file: %s", wavs[0].name)
|
|
return wavs[0]
|
|
|
|
|
|
def _normalize_pcm(pcm: bytes, bits: int) -> bytes:
|
|
"""Peak-normalize PCM data to 0 dBFS.
|
|
|
|
Supports 8-bit (unsigned) and 16-bit (signed) PCM.
|
|
Returns the original bytes unchanged if already at 0 dB or silent.
|
|
"""
|
|
if bits == 16:
|
|
samples = array.array("h", pcm) # signed 16-bit
|
|
peak = max(abs(s) for s in samples) if samples else 0
|
|
if peak == 0 or peak == 32767:
|
|
return pcm
|
|
scale = 32767 / peak
|
|
samples = array.array("h", (min(32767, max(-32768, int(s * scale))) for s in samples))
|
|
elif bits == 8:
|
|
samples = array.array("B", pcm) # unsigned 8-bit, center=128
|
|
peak = max(abs(s - 128) for s in samples) if samples else 0
|
|
if peak == 0 or peak == 127:
|
|
return pcm
|
|
scale = 127 / peak
|
|
samples = array.array("B", (max(0, min(255, int((s - 128) * scale) + 128)) for s in samples))
|
|
else:
|
|
log.warning("Normalization not supported for %d-bit audio, skipping", bits)
|
|
return pcm
|
|
|
|
gain_db = 20 * __import__("math").log10(scale) if scale > 0 else 0
|
|
log.info("Normalized: peak %d → 0 dBFS (gain %.1f dB)", peak, gain_db)
|
|
return samples.tobytes()
|
|
|
|
|
|
def read_wav(path: Path) -> tuple[bytes, int, int, int]:
|
|
"""Read WAV file, normalize to 0 dBFS, return (pcm_data, sample_rate, channels, bits)."""
|
|
try:
|
|
wf = wave.open(str(path), "rb")
|
|
except wave.Error as e:
|
|
raise ValueError(
|
|
f"{path.name}: unsupported WAV format ({e}). "
|
|
"Only 8/16-bit integer PCM is supported — no 32-bit float."
|
|
) from e
|
|
with wf:
|
|
sr = wf.getframerate()
|
|
ch = wf.getnchannels()
|
|
bits = wf.getsampwidth() * 8
|
|
pcm = wf.readframes(wf.getnframes())
|
|
log.info("WAV loaded: %dHz %dch %dbit, %.1fs, %d bytes",
|
|
sr, ch, bits, len(pcm) / (sr * ch * (bits // 8)), len(pcm))
|
|
pcm = _normalize_pcm(pcm, bits)
|
|
return pcm, sr, ch, bits
|
|
|
|
|
|
def chunk_bytes(data: bytes, size: int):
|
|
"""Yield data in fixed-size chunks."""
|
|
for i in range(0, len(data), size):
|
|
yield data[i : i + size]
|
|
|
|
|
|
async def stream_alarm(ws, pcm: bytes, sr: int, ch: int, bits: int):
|
|
"""Stream one alarm cycle to the connected client."""
|
|
bytes_per_sec = sr * ch * (bits // 8)
|
|
chunk_duration = CHUNK_SIZE / bytes_per_sec
|
|
pace_delay = chunk_duration * 0.9 # 90% real-time to avoid underrun
|
|
|
|
total_chunks = (len(pcm) + CHUNK_SIZE - 1) // CHUNK_SIZE
|
|
|
|
start_msg = json.dumps({
|
|
"type": "alarm_start",
|
|
"sample_rate": sr,
|
|
"channels": ch,
|
|
"bits": bits,
|
|
})
|
|
await ws.send(start_msg)
|
|
log.info("Sent alarm_start (%d chunks, pace %.1fms)", total_chunks, pace_delay * 1000)
|
|
|
|
for i, chunk in enumerate(chunk_bytes(pcm, CHUNK_SIZE)):
|
|
await ws.send(chunk)
|
|
await asyncio.sleep(pace_delay)
|
|
|
|
await ws.send(json.dumps({"type": "alarm_stop"}))
|
|
log.info("Sent alarm_stop")
|