config format fix and handling fix

multiple alarms supported
This commit is contained in:
Mikkeli Matlock
2026-02-15 22:46:59 +09:00
parent d9c5066c29
commit 73afaffeef
3 changed files with 103 additions and 74 deletions

View File

@@ -12,12 +12,53 @@ DEFAULT_CONFIG_PATH = Path(__file__).parent / "config" / "alarms.json"
VALID_DAYS = {"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"}
TIME_RE = re.compile(r"^([01]\d|2[0-3])[0-5]\d$")
DATE_RE = re.compile(r"^(0[1-9]|1[0-2])/(0[1-9]|[12]\d|3[01])$")
def load_config(path: Path) -> dict | None:
def _validate_entry(entry: dict, index: int) -> dict | None:
"""Validate a single alarm entry. Returns it if valid, None otherwise."""
if not isinstance(entry, dict):
log.warning("Alarm #%d: expected object, got %s", index, type(entry).__name__)
return None
alarm_time = entry.get("alarm_time")
if alarm_time is None:
log.warning("Alarm #%d: missing required field 'alarm_time'", index)
return None
if not isinstance(alarm_time, str) or not TIME_RE.match(alarm_time):
log.warning("Alarm #%d: invalid alarm_time '%s' — must be 4-digit HHMM", index, alarm_time)
return None
alarm_days = entry.get("alarm_days")
if alarm_days is not None:
if not isinstance(alarm_days, list) or not all(isinstance(d, str) for d in alarm_days):
log.warning("Alarm #%d: alarm_days must be a list of strings", index)
return None
bad = [d for d in alarm_days if d not in VALID_DAYS]
if bad:
log.warning("Alarm #%d: invalid day abbreviations: %s", index, bad)
return None
alarm_dates = entry.get("alarm_dates")
if alarm_dates is not None:
if not isinstance(alarm_dates, list) or not all(isinstance(d, str) for d in alarm_dates):
log.warning("Alarm #%d: alarm_dates must be a list of strings", index)
return None
bad = [d for d in alarm_dates if not DATE_RE.match(d)]
if bad:
log.warning("Alarm #%d: invalid date formats (expected MM/DD): %s", index, bad)
return None
log.info("Alarm #%d: time=%s days=%s", index, alarm_time, alarm_days or "(every day)")
return entry
def load_config(path: Path) -> list[dict] | None:
"""Read and validate alarm config JSON.
Returns the config dict, or None if the file is missing, empty, or invalid.
Accepts either a single alarm object or an array of alarm objects.
Returns a list of valid alarm dicts, or None if the file is missing,
empty, or contains no valid entries.
Never raises — logs warnings and returns None on any problem.
"""
try:
@@ -39,53 +80,35 @@ def load_config(path: Path) -> dict | None:
log.warning("Invalid JSON in %s: %s", path, e)
return None
# Empty object or empty list means "no alarms"
if not data or (isinstance(data, list) and len(data) == 0):
if not data:
log.info("Config is empty — no alarms configured")
return None
if not isinstance(data, dict):
log.warning("Config must be a JSON object, got %s", type(data).__name__)
# Normalize to list
if isinstance(data, dict):
entries = [data]
elif isinstance(data, list):
entries = data
else:
log.warning("Config must be a JSON object or array, got %s", type(data).__name__)
return None
# Validate alarm_time (required)
alarm_time = data.get("alarm_time")
if alarm_time is None:
log.warning("Missing required field 'alarm_time'")
return None
if not isinstance(alarm_time, str) or not TIME_RE.match(alarm_time):
log.warning("Invalid alarm_time '%s' — must be 4-digit HHMM (e.g. '0730')", alarm_time)
valid = []
for i, entry in enumerate(entries):
result = _validate_entry(entry, i)
if result is not None:
valid.append(result)
if not valid:
log.warning("No valid alarm entries in %s", path)
return None
# Validate alarm_days (optional)
alarm_days = data.get("alarm_days")
if alarm_days is not None:
if not isinstance(alarm_days, list) or not all(isinstance(d, str) for d in alarm_days):
log.warning("alarm_days must be a list of strings")
return None
bad = [d for d in alarm_days if d not in VALID_DAYS]
if bad:
log.warning("Invalid day abbreviations in alarm_days: %s", bad)
return None
# Validate alarm_dates (optional)
alarm_dates = data.get("alarm_dates")
if alarm_dates is not None:
if not isinstance(alarm_dates, list) or not all(isinstance(d, str) for d in alarm_dates):
log.warning("alarm_dates must be a list of strings")
return None
date_re = re.compile(r"^(0[1-9]|1[0-2])/(0[1-9]|[12]\d|3[01])$")
bad = [d for d in alarm_dates if not date_re.match(d)]
if bad:
log.warning("Invalid date formats in alarm_dates (expected MM/DD): %s", bad)
return None
log.info("Alarm config loaded: time=%s days=%s", alarm_time, alarm_days or "(every day)")
return data
log.info("Loaded %d alarm(s) from %s", len(valid), path)
return valid
def should_fire(config: dict) -> bool:
"""Check if the alarm should fire right now based on config schedule.
"""Check if a single alarm entry should fire right now.
Rules:
- alarm_time must match current HHMM
@@ -103,12 +126,10 @@ def should_fire(config: dict) -> bool:
alarm_days = config.get("alarm_days")
alarm_dates = config.get("alarm_dates")
# alarm_days takes priority over alarm_dates
if alarm_days is not None:
return now.strftime("%a") in alarm_days
if alarm_dates is not None:
return now.strftime("%m/%d") in alarm_dates
# Neither specified — fire every day
return True

View File

@@ -1,11 +1,13 @@
{
"alarm_time": "0730",
"alarm_days": ["Mon", "Tue", "Wed", "Thu", "Fri"],
"alarm_audio": "assets/alarm/alarm_test.wav",
"alarm_image": "assets/img/on_alarm.png"
},
{
"alarm_time": "2300",
"alarm_audio": "assets/alarm/sleep.wav",
"alarm_image": "assets/img/sleep.png"
}
[
{
"alarm_time": "0730",
"alarm_days": ["Mon", "Tue", "Wed", "Thu", "Fri"],
"alarm_audio": "assets/alarm/alarm_test.wav",
"alarm_image": "assets/img/on_alarm.png"
},
{
"alarm_time": "2250",
"alarm_audio": "assets/alarm/sleep.wav",
"alarm_image": "assets/img/sleep.png"
}
]

View File

@@ -47,45 +47,51 @@ def _resolve_path(relative: str) -> Path:
return p
def _prepare_alarm(entry: dict) -> dict:
"""Pre-resolve paths and load resources for a single alarm entry."""
audio_path = find_wav(_resolve_path(entry.get("alarm_audio", "assets/alarm/alarm_test.wav")))
alarm_img_path = _resolve_path(entry.get("alarm_image", "assets/img/on_alarm.png"))
pcm, sr, ch, bits = read_wav(audio_path)
img = load_status_image(alarm_img_path)
return {
"config": entry,
"pcm": pcm, "sr": sr, "ch": ch, "bits": bits,
"img": img,
"last_fired": None,
}
async def handler(ws):
"""Handle a single WebSocket connection."""
remote = ws.remote_address
log.info("Client connected: %s:%d", remote[0], remote[1])
config = load_config(_config_path)
# Resolve audio and image paths from config (or defaults)
if config:
audio_path = find_wav(_resolve_path(config.get("alarm_audio", "assets/alarm/alarm_test.wav")))
alarm_img_path = _resolve_path(config.get("alarm_image", "assets/img/on_alarm.png"))
else:
audio_path = None
alarm_img_path = IMG_DIR / "on_alarm.png"
configs = load_config(_config_path)
img_idle = load_status_image(IMG_DIR / "idle.png")
img_alarm = load_status_image(alarm_img_path)
try:
await send_status_image(ws, img_idle)
if not config:
if not configs:
log.info("No alarms configured — idling forever")
await asyncio.Future()
return
pcm, sr, ch, bits = read_wav(audio_path)
last_fired_minute = None
alarms = [_prepare_alarm(entry) for entry in configs]
while True:
if should_fire(config):
current_minute = datetime.now().strftime("%Y%m%d%H%M")
for alarm in alarms:
if should_fire(alarm["config"]):
current_minute = datetime.now().strftime("%Y%m%d%H%M")
if current_minute != last_fired_minute:
last_fired_minute = current_minute
log.info("Alarm firing at %s", current_minute)
await send_status_image(ws, img_alarm)
await stream_alarm(ws, pcm, sr, ch, bits)
await send_status_image(ws, img_idle)
if current_minute != alarm["last_fired"]:
alarm["last_fired"] = current_minute
log.info("Alarm firing: %s at %s",
alarm["config"]["alarm_time"], current_minute)
await send_status_image(ws, alarm["img"])
await stream_alarm(ws, alarm["pcm"], alarm["sr"],
alarm["ch"], alarm["bits"])
await send_status_image(ws, img_idle)
await asyncio.sleep(TICK_INTERVAL)