new alarm mechanism

This commit is contained in:
Mikkeli Matlock
2026-02-15 22:44:36 +09:00
parent e5cc124dd3
commit 89c975bf17
10 changed files with 243 additions and 22 deletions

View File

@@ -7,7 +7,8 @@
"Bash(do sed -i '/\\\\.static_bitmap = 0,/d' \"$f\")",
"Bash(done)",
"Bash(file:*)",
"mcp__ide__getDiagnostics"
"mcp__ide__getDiagnostics",
"Bash(python -m py_compile:*)"
]
},
"outputStyle": "iseri",

View File

@@ -25,8 +25,8 @@ File: `pi/alarm_config.json`
| `alarm_time` | `string` | 4-digit HHMM (24-hour). Triggers on the 0th second of the minute, or as soon as Python detects it. |
| `alarm_days` | `string[]` | (Optional) 3-letter day abbreviations: `Mon`, `Tue`, `Wed`, `Thu`, `Fri`, `Sat`, `Sun`. Alarm only fires on listed days. If not supplied, alarms every day. |
| `alarm_dates` | `string[]` | (Optional) Strings of `MM/DD` format. If both `alarm_days` and `alarm_dates` are set, only `alarm_days` is effective. |
| `alarm_audio` | `string` | Path to WAV file. Relative paths resolve from `pi/`. |
| `alarm_image` | `string` | Path to status PNG shown during alarm. Relative paths resolve from `pi/`. If not supplied, default to `assets/img/on_alarm.png` |
| `alarm_audio` | `string` | (Optional) Path to WAV file. Relative paths resolve from `pi/`. If not supplied, defaults to `assets/alarm/alarm_test.wav`. |
| `alarm_image` | `string` | (Optional) Path to status PNG shown during alarm. Relative paths resolve from `pi/`. If not supplied, defaults to `assets/img/on_alarm.png`. |
### Behavior

114
pi/alarm_scheduler.py Normal file
View File

@@ -0,0 +1,114 @@
"""Alarm scheduler — load config and check firing schedule."""
import json
import logging
import re
from datetime import datetime
from pathlib import Path
log = logging.getLogger(__name__)
DEFAULT_CONFIG_PATH = Path(__file__).parent / "config" / "alarms.json"
VALID_DAYS = {"Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"}
TIME_RE = re.compile(r"^([01]\d|2[0-3])[0-5]\d$")
def load_config(path: Path) -> dict | None:
"""Read and validate alarm config JSON.
Returns the config dict, or None if the file is missing, empty, or invalid.
Never raises — logs warnings and returns None on any problem.
"""
try:
text = path.read_text(encoding="utf-8").strip()
except FileNotFoundError:
log.warning("Config file not found: %s", path)
return None
except OSError as e:
log.warning("Cannot read config %s: %s", path, e)
return None
if not text:
log.warning("Config file is empty: %s", path)
return None
try:
data = json.loads(text)
except json.JSONDecodeError as e:
log.warning("Invalid JSON in %s: %s", path, e)
return None
# Empty object or empty list means "no alarms"
if not data or (isinstance(data, list) and len(data) == 0):
log.info("Config is empty — no alarms configured")
return None
if not isinstance(data, dict):
log.warning("Config must be a JSON object, got %s", type(data).__name__)
return None
# Validate alarm_time (required)
alarm_time = data.get("alarm_time")
if alarm_time is None:
log.warning("Missing required field 'alarm_time'")
return None
if not isinstance(alarm_time, str) or not TIME_RE.match(alarm_time):
log.warning("Invalid alarm_time '%s' — must be 4-digit HHMM (e.g. '0730')", alarm_time)
return None
# Validate alarm_days (optional)
alarm_days = data.get("alarm_days")
if alarm_days is not None:
if not isinstance(alarm_days, list) or not all(isinstance(d, str) for d in alarm_days):
log.warning("alarm_days must be a list of strings")
return None
bad = [d for d in alarm_days if d not in VALID_DAYS]
if bad:
log.warning("Invalid day abbreviations in alarm_days: %s", bad)
return None
# Validate alarm_dates (optional)
alarm_dates = data.get("alarm_dates")
if alarm_dates is not None:
if not isinstance(alarm_dates, list) or not all(isinstance(d, str) for d in alarm_dates):
log.warning("alarm_dates must be a list of strings")
return None
date_re = re.compile(r"^(0[1-9]|1[0-2])/(0[1-9]|[12]\d|3[01])$")
bad = [d for d in alarm_dates if not date_re.match(d)]
if bad:
log.warning("Invalid date formats in alarm_dates (expected MM/DD): %s", bad)
return None
log.info("Alarm config loaded: time=%s days=%s", alarm_time, alarm_days or "(every day)")
return data
def should_fire(config: dict) -> bool:
"""Check if the alarm should fire right now based on config schedule.
Rules:
- alarm_time must match current HHMM
- If alarm_days is present, today's 3-letter abbreviation must be in the list
- If alarm_days is absent but alarm_dates is present, today's MM/DD must match
- If neither alarm_days nor alarm_dates is present, fires every day
- If both are present, alarm_days wins (alarm_dates ignored)
"""
now = datetime.now()
current_hhmm = now.strftime("%H%M")
if config["alarm_time"] != current_hhmm:
return False
alarm_days = config.get("alarm_days")
alarm_dates = config.get("alarm_dates")
# alarm_days takes priority over alarm_dates
if alarm_days is not None:
return now.strftime("%a") in alarm_days
if alarm_dates is not None:
return now.strftime("%m/%d") in alarm_dates
# Neither specified — fire every day
return True

42
pi/assets/alarm/sleep.lab Normal file
View File

@@ -0,0 +1,42 @@
0 850000 s
850000 1580000 u
1580000 2220000 i
2220000 2660000 m
2660000 3370000 i
3370000 4140000 N
4140000 4610000 g
4610000 5780000 a
5780000 13780000 pau
13780000 14350000 k
14350000 15010000 i
15010000 15730000 e
15730000 16160000 t
16160000 16800000 e
16800000 17260000 k
17260000 17840000 u
17840000 18390000 d
18390000 19090000 a
19090000 19700000 s
19700000 20390000 a
20390000 20830000 r
20830000 22120000 i
22120000 23620000 pau
23620000 24390000 a
24390000 24810000 r
24810000 25430000 i
25430000 25860000 g
25860000 26550000 a
26550000 27000000 t
27000000 27780000 o
27780000 28520000 o
28520000 29000000 g
29000000 29740000 o
29740000 30260000 z
30260000 31110000 a
31110000 31790000 i
31790000 32190000 m
32190000 32870000 a
32870000 33480000 sh
33480000 34040000 i
34040000 34520000 t
34520000 35950000 a

BIN
pi/assets/alarm/sleep.wav Normal file

Binary file not shown.

BIN
pi/assets/img/sleep.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 47 KiB

View File

@@ -12,8 +12,19 @@ CHUNK_SIZE = 4096
AUDIO_DIR = Path(__file__).parent / "assets" / "alarm"
def find_wav() -> Path:
"""Find the first .wav file in the alarm assets directory."""
def find_wav(path: Path | None = None) -> Path:
"""Return a WAV file path.
If *path* is given and points to an existing file, use it directly.
Otherwise fall back to the first .wav found in the alarm assets directory.
"""
if path is not None:
p = Path(path)
if p.is_file():
log.info("Using audio file: %s", p)
return p
log.warning("Specified audio path not found: %s — falling back to glob", p)
wavs = list(AUDIO_DIR.glob("*.wav"))
if not wavs:
raise FileNotFoundError(f"No .wav files found in {AUDIO_DIR}")

11
pi/config/alarms.json Normal file
View File

@@ -0,0 +1,11 @@
{
"alarm_time": "0730",
"alarm_days": ["Mon", "Tue", "Wed", "Thu", "Fri"],
"alarm_audio": "assets/alarm/alarm_test.wav",
"alarm_image": "assets/img/on_alarm.png"
},
{
"alarm_time": "2300",
"alarm_audio": "assets/alarm/sleep.wav",
"alarm_image": "assets/img/sleep.png"
}

View File

@@ -15,12 +15,15 @@ Protocol:
3. Text frame: {"type":"alarm_stop"}
"""
import argparse
import asyncio
import logging
from random import randint
from datetime import datetime
from pathlib import Path
import websockets
from alarm_scheduler import DEFAULT_CONFIG_PATH, load_config, should_fire
from audio_handler import find_wav, read_wav, stream_alarm
from image_handler import IMG_DIR, load_status_image, send_status_image
@@ -28,6 +31,20 @@ logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(mess
log = logging.getLogger("contents_server")
PORT = 8766
PI_DIR = Path(__file__).parent
# Set by main(), read by handler()
_config_path: Path = DEFAULT_CONFIG_PATH
TICK_INTERVAL = 5 # seconds between schedule checks
def _resolve_path(relative: str) -> Path:
"""Resolve a config path relative to pi/ directory."""
p = Path(relative)
if not p.is_absolute():
p = PI_DIR / p
return p
async def handler(ws):
@@ -35,27 +52,43 @@ async def handler(ws):
remote = ws.remote_address
log.info("Client connected: %s:%d", remote[0], remote[1])
wav_path = find_wav()
pcm, sr, ch, bits = read_wav(wav_path)
config = load_config(_config_path)
# Resolve audio and image paths from config (or defaults)
if config:
audio_path = find_wav(_resolve_path(config.get("alarm_audio", "assets/alarm/alarm_test.wav")))
alarm_img_path = _resolve_path(config.get("alarm_image", "assets/img/on_alarm.png"))
else:
audio_path = None
alarm_img_path = IMG_DIR / "on_alarm.png"
# Load status images
img_idle = load_status_image(IMG_DIR / "idle.png")
img_alarm = load_status_image(IMG_DIR / "on_alarm.png")
img_alarm = load_status_image(alarm_img_path)
try:
# Send idle image on connect
await send_status_image(ws, img_idle)
while True:
delay = randint(30, 60)
log.info("Next alarm in %ds", delay)
await asyncio.sleep(delay)
if not config:
log.info("No alarms configured — idling forever")
await asyncio.Future()
return
pcm, sr, ch, bits = read_wav(audio_path)
last_fired_minute = None
while True:
if should_fire(config):
current_minute = datetime.now().strftime("%Y%m%d%H%M")
if current_minute != last_fired_minute:
last_fired_minute = current_minute
log.info("Alarm firing at %s", current_minute)
await send_status_image(ws, img_alarm)
await stream_alarm(ws, pcm, sr, ch, bits)
await send_status_image(ws, img_idle)
await asyncio.sleep(TICK_INTERVAL)
# Switch to alarm image before audio
await send_status_image(ws, img_alarm)
await stream_alarm(ws, pcm, sr, ch, bits)
# Switch back to idle after alarm
await send_status_image(ws, img_idle)
except websockets.exceptions.ConnectionClosed:
log.info("Client disconnected: %s:%d", remote[0], remote[1])
@@ -63,8 +96,13 @@ async def handler(ws):
async def main():
log.info("Contents server starting on port %d", PORT)
async with websockets.serve(handler, "0.0.0.0", PORT):
await asyncio.Future() # run forever
await asyncio.Future()
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Alarm contents server")
parser.add_argument("--config", type=Path, default=DEFAULT_CONFIG_PATH,
help="Path to alarm config JSON (default: %(default)s)")
args = parser.parse_args()
_config_path = args.config
asyncio.run(main())

View File

@@ -4,9 +4,13 @@ import subprocess, sys, signal
from pathlib import Path
d = Path(__file__).parent
# Forward any CLI args (e.g. --config) to contents_server
extra_args = sys.argv[1:]
procs = [
subprocess.Popen([sys.executable, d / "stats_server.py"]),
subprocess.Popen([sys.executable, d / "contents_server.py"]),
subprocess.Popen([sys.executable, d / "contents_server.py"] + extra_args),
]
signal.signal(signal.SIGINT, lambda *_: [p.terminate() for p in procs])
signal.signal(signal.SIGTERM, lambda *_: [p.terminate() for p in procs])