alarm audio normalisation

This commit is contained in:
Mikkeli Matlock
2026-02-15 23:12:25 +09:00
parent 76bf8c966d
commit a25e4b2cb3
2 changed files with 42 additions and 3 deletions

View File

@@ -1,8 +1,10 @@
"""Audio alarm functions — WAV loading and PCM streaming."""
import array
import asyncio
import json
import logging
import math
import wave
from pathlib import Path
@@ -32,15 +34,52 @@ def find_wav(path: Path | None = None) -> Path:
return wavs[0]
def _normalize_pcm(pcm: bytes, bits: int) -> bytes:
"""Peak-normalize PCM data to 0 dBFS.
Supports 8-bit (unsigned) and 16-bit (signed) PCM.
Returns the original bytes unchanged if already at 0 dB or silent.
"""
if bits == 16:
samples = array.array("h", pcm) # signed 16-bit
peak = max(abs(s) for s in samples) if samples else 0
if peak == 0 or peak == 32767:
return pcm
scale = 32767 / peak
samples = array.array("h", (min(32767, max(-32768, int(s * scale))) for s in samples))
elif bits == 8:
samples = array.array("B", pcm) # unsigned 8-bit, center=128
peak = max(abs(s - 128) for s in samples) if samples else 0
if peak == 0 or peak == 127:
return pcm
scale = 127 / peak
samples = array.array("B", (max(0, min(255, int((s - 128) * scale) + 128)) for s in samples))
else:
log.warning("Normalization not supported for %d-bit audio, skipping", bits)
return pcm
gain_db = 20 * __import__("math").log10(scale) if scale > 0 else 0
log.info("Normalized: peak %d → 0 dBFS (gain %.1f dB)", peak, gain_db)
return samples.tobytes()
def read_wav(path: Path) -> tuple[bytes, int, int, int]:
"""Read WAV file and return (pcm_data, sample_rate, channels, bits_per_sample)."""
with wave.open(str(path), "rb") as wf:
"""Read WAV file, normalize to 0 dBFS, return (pcm_data, sample_rate, channels, bits)."""
try:
wf = wave.open(str(path), "rb")
except wave.Error as e:
raise ValueError(
f"{path.name}: unsupported WAV format ({e}). "
"Only 8/16-bit integer PCM is supported — no 32-bit float."
) from e
with wf:
sr = wf.getframerate()
ch = wf.getnchannels()
bits = wf.getsampwidth() * 8
pcm = wf.readframes(wf.getnframes())
log.info("WAV loaded: %dHz %dch %dbit, %.1fs, %d bytes",
sr, ch, bits, len(pcm) / (sr * ch * (bits // 8)), len(pcm))
pcm = _normalize_pcm(pcm, bits)
return pcm, sr, ch, bits