"""LTE service - polls ModemManager for signal quality and connection state.""" import random import subprocess import re import threading import time from typing import Any # ============================================================================ # DEBUG MODE - Set True for development without modem hardware # When True: skips mmcli entirely, generates mock LTE data # When False: polls real ModemManager via mmcli # ============================================================================ _LTE_DEBUG = True class LteService: """Threaded LTE modem status poller. Polls `mmcli -m 0` every 5 seconds, parses signal quality, connection state, operator, and access technology. Emits dict via callback. No history buffer — historical signal strength isn't useful. """ def __init__(self, poll_interval: float = 5.0): self.poll_interval = poll_interval self._latest: dict[str, Any] = {} self._connected = False # True when mmcli responds (modem alive) self._running = False self._thread: threading.Thread | None = None self._lock = threading.Lock() # Callback for push-based updates self._on_data_callback = None def set_on_data(self, callback): """Set callback for new LTE data. Called with data dict.""" self._on_data_callback = callback @property def connected(self) -> bool: """Whether the modem is reachable via mmcli.""" return self._connected def get_latest(self) -> dict[str, Any]: """Get most recent LTE status.""" with self._lock: return self._latest.copy() if self._latest else {"error": "no data"} def start(self): """Start background LTE poller thread.""" if self._running: return self._running = True self._thread = threading.Thread(target=self._poll_loop, daemon=True) self._thread.start() print("[LTE] Service started") def stop(self): """Stop background poller.""" self._running = False if self._thread: self._thread.join(timeout=2.0) def _poll_loop(self): """Main poll loop.""" print("[LTE] Poller thread running") while self._running: try: if _LTE_DEBUG: data = self._stub_poll() else: data = self._real_poll() with self._lock: self._latest = data if self._on_data_callback: self._on_data_callback(data) except Exception as e: print(f"[LTE] Poll error: {e}") self._connected = False time.sleep(self.poll_interval) def _real_poll(self) -> dict[str, Any]: """Poll mmcli -m 0 and parse output.""" try: result = subprocess.run( ["mmcli", "-m", "0"], capture_output=True, text=True, timeout=5.0, ) if result.returncode != 0: self._connected = False return { "connected": False, "signal": 0, "operator": None, "access_tech": None, } output = result.stdout self._connected = True # Parse signal quality: "signal quality: 68 (recent)" signal = 0 m = re.search(r"signal quality:\s*(\d+)", output) if m: signal = int(m.group(1)) # Parse state: "state: connected" / "registered" / "searching" etc. state = None m = re.search(r"^\s*state:\s*(\S+)", output, re.MULTILINE) if m: state = m.group(1).strip("'\"") # Parse operator: "operator name: KDDI KDDI" operator = None m = re.search(r"operator name:\s*(.+)", output) if m: operator = m.group(1).strip() # Parse access tech: "access tech: lte" access_tech = None m = re.search(r"access tech:\s*(\S+)", output) if m: access_tech = m.group(1).strip("'\"") network_connected = state in ("connected", "registered") return { "connected": network_connected, "signal": signal, "operator": operator, "access_tech": access_tech, } except subprocess.TimeoutExpired: print("[LTE] mmcli timed out") self._connected = False return { "connected": False, "signal": 0, "operator": None, "access_tech": None, } except FileNotFoundError: print("[LTE] mmcli not found, falling back to stub mode") self._connected = False return self._stub_poll() # Stub state lives across polls _stub_signal: float = 70.0 def _stub_poll(self) -> dict[str, Any]: """Generate mock LTE data for development. Simulates connected state with signal wandering 60-80. """ self._connected = True # Random walk signal, clamped to 60-80 self._stub_signal += random.uniform(-3, 3) self._stub_signal = max(60, min(80, self._stub_signal)) return { "connected": True, "signal": int(self._stub_signal), "operator": "STUB", "access_tech": "lte", }