Files
smart-serow/pi/backend/lte_service.py

179 lines
5.5 KiB
Python
Raw Permalink Normal View History

2026-02-09 02:36:03 +09:00
"""LTE service - polls ModemManager for signal quality and connection state."""
import random
import subprocess
import re
import threading
import time
from typing import Any
# ============================================================================
# DEBUG MODE - Set True for development without modem hardware
# When True: skips mmcli entirely, generates mock LTE data
# When False: polls real ModemManager via mmcli
# ============================================================================
_LTE_DEBUG = True
class LteService:
"""Threaded LTE modem status poller.
Polls `mmcli -m 0` every 5 seconds, parses signal quality, connection
state, operator, and access technology. Emits dict via callback.
No history buffer historical signal strength isn't useful.
"""
def __init__(self, poll_interval: float = 5.0):
self.poll_interval = poll_interval
self._latest: dict[str, Any] = {}
self._connected = False # True when mmcli responds (modem alive)
self._running = False
self._thread: threading.Thread | None = None
self._lock = threading.Lock()
# Callback for push-based updates
self._on_data_callback = None
def set_on_data(self, callback):
"""Set callback for new LTE data. Called with data dict."""
self._on_data_callback = callback
@property
def connected(self) -> bool:
"""Whether the modem is reachable via mmcli."""
return self._connected
def get_latest(self) -> dict[str, Any]:
"""Get most recent LTE status."""
with self._lock:
return self._latest.copy() if self._latest else {"error": "no data"}
def start(self):
"""Start background LTE poller thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start()
print("[LTE] Service started")
def stop(self):
"""Stop background poller."""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
def _poll_loop(self):
"""Main poll loop."""
print("[LTE] Poller thread running")
while self._running:
try:
if _LTE_DEBUG:
data = self._stub_poll()
else:
data = self._real_poll()
with self._lock:
self._latest = data
if self._on_data_callback:
self._on_data_callback(data)
except Exception as e:
print(f"[LTE] Poll error: {e}")
self._connected = False
time.sleep(self.poll_interval)
def _real_poll(self) -> dict[str, Any]:
"""Poll mmcli -m 0 and parse output."""
try:
result = subprocess.run(
["mmcli", "-m", "0"],
capture_output=True,
text=True,
timeout=5.0,
)
if result.returncode != 0:
self._connected = False
return {
"connected": False,
"signal": 0,
"operator": None,
"access_tech": None,
}
output = result.stdout
self._connected = True
# Parse signal quality: "signal quality: 68 (recent)"
signal = 0
m = re.search(r"signal quality:\s*(\d+)", output)
if m:
signal = int(m.group(1))
# Parse state: "state: connected" / "registered" / "searching" etc.
state = None
m = re.search(r"^\s*state:\s*(\S+)", output, re.MULTILINE)
if m:
state = m.group(1).strip("'\"")
# Parse operator: "operator name: KDDI KDDI"
operator = None
m = re.search(r"operator name:\s*(.+)", output)
if m:
operator = m.group(1).strip()
# Parse access tech: "access tech: lte"
access_tech = None
m = re.search(r"access tech:\s*(\S+)", output)
if m:
access_tech = m.group(1).strip("'\"")
network_connected = state in ("connected", "registered")
return {
"connected": network_connected,
"signal": signal,
"operator": operator,
"access_tech": access_tech,
}
except subprocess.TimeoutExpired:
print("[LTE] mmcli timed out")
self._connected = False
return {
"connected": False,
"signal": 0,
"operator": None,
"access_tech": None,
}
except FileNotFoundError:
print("[LTE] mmcli not found, falling back to stub mode")
self._connected = False
return self._stub_poll()
# Stub state lives across polls
_stub_signal: float = 70.0
def _stub_poll(self) -> dict[str, Any]:
"""Generate mock LTE data for development.
Simulates connected state with signal wandering 60-80.
"""
self._connected = True
# Random walk signal, clamped to 60-80
self._stub_signal += random.uniform(-3, 3)
self._stub_signal = max(60, min(80, self._stub_signal))
return {
"connected": True,
"signal": int(self._stub_signal),
"operator": "STUB",
"access_tech": "lte",
}