config format fix and handling fix
multiple alarms supported
This commit is contained in:
@@ -12,12 +12,53 @@ DEFAULT_CONFIG_PATH = Path(__file__).parent / "config" / "alarms.json"
|
|||||||
|
|
||||||
VALID_DAYS = {"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"}
|
VALID_DAYS = {"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"}
|
||||||
TIME_RE = re.compile(r"^([01]\d|2[0-3])[0-5]\d$")
|
TIME_RE = re.compile(r"^([01]\d|2[0-3])[0-5]\d$")
|
||||||
|
DATE_RE = re.compile(r"^(0[1-9]|1[0-2])/(0[1-9]|[12]\d|3[01])$")
|
||||||
|
|
||||||
|
|
||||||
def load_config(path: Path) -> dict | None:
|
def _validate_entry(entry: dict, index: int) -> dict | None:
|
||||||
|
"""Validate a single alarm entry. Returns it if valid, None otherwise."""
|
||||||
|
if not isinstance(entry, dict):
|
||||||
|
log.warning("Alarm #%d: expected object, got %s", index, type(entry).__name__)
|
||||||
|
return None
|
||||||
|
|
||||||
|
alarm_time = entry.get("alarm_time")
|
||||||
|
if alarm_time is None:
|
||||||
|
log.warning("Alarm #%d: missing required field 'alarm_time'", index)
|
||||||
|
return None
|
||||||
|
if not isinstance(alarm_time, str) or not TIME_RE.match(alarm_time):
|
||||||
|
log.warning("Alarm #%d: invalid alarm_time '%s' — must be 4-digit HHMM", index, alarm_time)
|
||||||
|
return None
|
||||||
|
|
||||||
|
alarm_days = entry.get("alarm_days")
|
||||||
|
if alarm_days is not None:
|
||||||
|
if not isinstance(alarm_days, list) or not all(isinstance(d, str) for d in alarm_days):
|
||||||
|
log.warning("Alarm #%d: alarm_days must be a list of strings", index)
|
||||||
|
return None
|
||||||
|
bad = [d for d in alarm_days if d not in VALID_DAYS]
|
||||||
|
if bad:
|
||||||
|
log.warning("Alarm #%d: invalid day abbreviations: %s", index, bad)
|
||||||
|
return None
|
||||||
|
|
||||||
|
alarm_dates = entry.get("alarm_dates")
|
||||||
|
if alarm_dates is not None:
|
||||||
|
if not isinstance(alarm_dates, list) or not all(isinstance(d, str) for d in alarm_dates):
|
||||||
|
log.warning("Alarm #%d: alarm_dates must be a list of strings", index)
|
||||||
|
return None
|
||||||
|
bad = [d for d in alarm_dates if not DATE_RE.match(d)]
|
||||||
|
if bad:
|
||||||
|
log.warning("Alarm #%d: invalid date formats (expected MM/DD): %s", index, bad)
|
||||||
|
return None
|
||||||
|
|
||||||
|
log.info("Alarm #%d: time=%s days=%s", index, alarm_time, alarm_days or "(every day)")
|
||||||
|
return entry
|
||||||
|
|
||||||
|
|
||||||
|
def load_config(path: Path) -> list[dict] | None:
|
||||||
"""Read and validate alarm config JSON.
|
"""Read and validate alarm config JSON.
|
||||||
|
|
||||||
Returns the config dict, or None if the file is missing, empty, or invalid.
|
Accepts either a single alarm object or an array of alarm objects.
|
||||||
|
Returns a list of valid alarm dicts, or None if the file is missing,
|
||||||
|
empty, or contains no valid entries.
|
||||||
Never raises — logs warnings and returns None on any problem.
|
Never raises — logs warnings and returns None on any problem.
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
@@ -39,53 +80,35 @@ def load_config(path: Path) -> dict | None:
|
|||||||
log.warning("Invalid JSON in %s: %s", path, e)
|
log.warning("Invalid JSON in %s: %s", path, e)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Empty object or empty list means "no alarms"
|
if not data:
|
||||||
if not data or (isinstance(data, list) and len(data) == 0):
|
|
||||||
log.info("Config is empty — no alarms configured")
|
log.info("Config is empty — no alarms configured")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if not isinstance(data, dict):
|
# Normalize to list
|
||||||
log.warning("Config must be a JSON object, got %s", type(data).__name__)
|
if isinstance(data, dict):
|
||||||
|
entries = [data]
|
||||||
|
elif isinstance(data, list):
|
||||||
|
entries = data
|
||||||
|
else:
|
||||||
|
log.warning("Config must be a JSON object or array, got %s", type(data).__name__)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Validate alarm_time (required)
|
valid = []
|
||||||
alarm_time = data.get("alarm_time")
|
for i, entry in enumerate(entries):
|
||||||
if alarm_time is None:
|
result = _validate_entry(entry, i)
|
||||||
log.warning("Missing required field 'alarm_time'")
|
if result is not None:
|
||||||
return None
|
valid.append(result)
|
||||||
if not isinstance(alarm_time, str) or not TIME_RE.match(alarm_time):
|
|
||||||
log.warning("Invalid alarm_time '%s' — must be 4-digit HHMM (e.g. '0730')", alarm_time)
|
if not valid:
|
||||||
|
log.warning("No valid alarm entries in %s", path)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
# Validate alarm_days (optional)
|
log.info("Loaded %d alarm(s) from %s", len(valid), path)
|
||||||
alarm_days = data.get("alarm_days")
|
return valid
|
||||||
if alarm_days is not None:
|
|
||||||
if not isinstance(alarm_days, list) or not all(isinstance(d, str) for d in alarm_days):
|
|
||||||
log.warning("alarm_days must be a list of strings")
|
|
||||||
return None
|
|
||||||
bad = [d for d in alarm_days if d not in VALID_DAYS]
|
|
||||||
if bad:
|
|
||||||
log.warning("Invalid day abbreviations in alarm_days: %s", bad)
|
|
||||||
return None
|
|
||||||
|
|
||||||
# Validate alarm_dates (optional)
|
|
||||||
alarm_dates = data.get("alarm_dates")
|
|
||||||
if alarm_dates is not None:
|
|
||||||
if not isinstance(alarm_dates, list) or not all(isinstance(d, str) for d in alarm_dates):
|
|
||||||
log.warning("alarm_dates must be a list of strings")
|
|
||||||
return None
|
|
||||||
date_re = re.compile(r"^(0[1-9]|1[0-2])/(0[1-9]|[12]\d|3[01])$")
|
|
||||||
bad = [d for d in alarm_dates if not date_re.match(d)]
|
|
||||||
if bad:
|
|
||||||
log.warning("Invalid date formats in alarm_dates (expected MM/DD): %s", bad)
|
|
||||||
return None
|
|
||||||
|
|
||||||
log.info("Alarm config loaded: time=%s days=%s", alarm_time, alarm_days or "(every day)")
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
def should_fire(config: dict) -> bool:
|
def should_fire(config: dict) -> bool:
|
||||||
"""Check if the alarm should fire right now based on config schedule.
|
"""Check if a single alarm entry should fire right now.
|
||||||
|
|
||||||
Rules:
|
Rules:
|
||||||
- alarm_time must match current HHMM
|
- alarm_time must match current HHMM
|
||||||
@@ -103,12 +126,10 @@ def should_fire(config: dict) -> bool:
|
|||||||
alarm_days = config.get("alarm_days")
|
alarm_days = config.get("alarm_days")
|
||||||
alarm_dates = config.get("alarm_dates")
|
alarm_dates = config.get("alarm_dates")
|
||||||
|
|
||||||
# alarm_days takes priority over alarm_dates
|
|
||||||
if alarm_days is not None:
|
if alarm_days is not None:
|
||||||
return now.strftime("%a") in alarm_days
|
return now.strftime("%a") in alarm_days
|
||||||
|
|
||||||
if alarm_dates is not None:
|
if alarm_dates is not None:
|
||||||
return now.strftime("%m/%d") in alarm_dates
|
return now.strftime("%m/%d") in alarm_dates
|
||||||
|
|
||||||
# Neither specified — fire every day
|
|
||||||
return True
|
return True
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
[
|
||||||
{
|
{
|
||||||
"alarm_time": "0730",
|
"alarm_time": "0730",
|
||||||
"alarm_days": ["Mon", "Tue", "Wed", "Thu", "Fri"],
|
"alarm_days": ["Mon", "Tue", "Wed", "Thu", "Fri"],
|
||||||
@@ -5,7 +6,8 @@
|
|||||||
"alarm_image": "assets/img/on_alarm.png"
|
"alarm_image": "assets/img/on_alarm.png"
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
"alarm_time": "2300",
|
"alarm_time": "2250",
|
||||||
"alarm_audio": "assets/alarm/sleep.wav",
|
"alarm_audio": "assets/alarm/sleep.wav",
|
||||||
"alarm_image": "assets/img/sleep.png"
|
"alarm_image": "assets/img/sleep.png"
|
||||||
}
|
}
|
||||||
|
]
|
||||||
|
|||||||
@@ -47,44 +47,50 @@ def _resolve_path(relative: str) -> Path:
|
|||||||
return p
|
return p
|
||||||
|
|
||||||
|
|
||||||
|
def _prepare_alarm(entry: dict) -> dict:
|
||||||
|
"""Pre-resolve paths and load resources for a single alarm entry."""
|
||||||
|
audio_path = find_wav(_resolve_path(entry.get("alarm_audio", "assets/alarm/alarm_test.wav")))
|
||||||
|
alarm_img_path = _resolve_path(entry.get("alarm_image", "assets/img/on_alarm.png"))
|
||||||
|
pcm, sr, ch, bits = read_wav(audio_path)
|
||||||
|
img = load_status_image(alarm_img_path)
|
||||||
|
return {
|
||||||
|
"config": entry,
|
||||||
|
"pcm": pcm, "sr": sr, "ch": ch, "bits": bits,
|
||||||
|
"img": img,
|
||||||
|
"last_fired": None,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
async def handler(ws):
|
async def handler(ws):
|
||||||
"""Handle a single WebSocket connection."""
|
"""Handle a single WebSocket connection."""
|
||||||
remote = ws.remote_address
|
remote = ws.remote_address
|
||||||
log.info("Client connected: %s:%d", remote[0], remote[1])
|
log.info("Client connected: %s:%d", remote[0], remote[1])
|
||||||
|
|
||||||
config = load_config(_config_path)
|
configs = load_config(_config_path)
|
||||||
|
|
||||||
# Resolve audio and image paths from config (or defaults)
|
|
||||||
if config:
|
|
||||||
audio_path = find_wav(_resolve_path(config.get("alarm_audio", "assets/alarm/alarm_test.wav")))
|
|
||||||
alarm_img_path = _resolve_path(config.get("alarm_image", "assets/img/on_alarm.png"))
|
|
||||||
else:
|
|
||||||
audio_path = None
|
|
||||||
alarm_img_path = IMG_DIR / "on_alarm.png"
|
|
||||||
|
|
||||||
img_idle = load_status_image(IMG_DIR / "idle.png")
|
img_idle = load_status_image(IMG_DIR / "idle.png")
|
||||||
img_alarm = load_status_image(alarm_img_path)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
await send_status_image(ws, img_idle)
|
await send_status_image(ws, img_idle)
|
||||||
|
|
||||||
if not config:
|
if not configs:
|
||||||
log.info("No alarms configured — idling forever")
|
log.info("No alarms configured — idling forever")
|
||||||
await asyncio.Future()
|
await asyncio.Future()
|
||||||
return
|
return
|
||||||
|
|
||||||
pcm, sr, ch, bits = read_wav(audio_path)
|
alarms = [_prepare_alarm(entry) for entry in configs]
|
||||||
last_fired_minute = None
|
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if should_fire(config):
|
for alarm in alarms:
|
||||||
|
if should_fire(alarm["config"]):
|
||||||
current_minute = datetime.now().strftime("%Y%m%d%H%M")
|
current_minute = datetime.now().strftime("%Y%m%d%H%M")
|
||||||
|
|
||||||
if current_minute != last_fired_minute:
|
if current_minute != alarm["last_fired"]:
|
||||||
last_fired_minute = current_minute
|
alarm["last_fired"] = current_minute
|
||||||
log.info("Alarm firing at %s", current_minute)
|
log.info("Alarm firing: %s at %s",
|
||||||
await send_status_image(ws, img_alarm)
|
alarm["config"]["alarm_time"], current_minute)
|
||||||
await stream_alarm(ws, pcm, sr, ch, bits)
|
await send_status_image(ws, alarm["img"])
|
||||||
|
await stream_alarm(ws, alarm["pcm"], alarm["sr"],
|
||||||
|
alarm["ch"], alarm["bits"])
|
||||||
await send_status_image(ws, img_idle)
|
await send_status_image(ws, img_idle)
|
||||||
|
|
||||||
await asyncio.sleep(TICK_INTERVAL)
|
await asyncio.sleep(TICK_INTERVAL)
|
||||||
|
|||||||
Reference in New Issue
Block a user