lte service (backend) and ui handling
This commit is contained in:
178
pi/backend/lte_service.py
Normal file
178
pi/backend/lte_service.py
Normal file
@@ -0,0 +1,178 @@
|
||||
"""LTE service - polls ModemManager for signal quality and connection state."""
|
||||
|
||||
import random
|
||||
import subprocess
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
# ============================================================================
|
||||
# DEBUG MODE - Set True for development without modem hardware
|
||||
# When True: skips mmcli entirely, generates mock LTE data
|
||||
# When False: polls real ModemManager via mmcli
|
||||
# ============================================================================
|
||||
_LTE_DEBUG = True
|
||||
|
||||
|
||||
class LteService:
|
||||
"""Threaded LTE modem status poller.
|
||||
|
||||
Polls `mmcli -m 0` every 5 seconds, parses signal quality, connection
|
||||
state, operator, and access technology. Emits dict via callback.
|
||||
|
||||
No history buffer — historical signal strength isn't useful.
|
||||
"""
|
||||
|
||||
def __init__(self, poll_interval: float = 5.0):
|
||||
self.poll_interval = poll_interval
|
||||
|
||||
self._latest: dict[str, Any] = {}
|
||||
self._connected = False # True when mmcli responds (modem alive)
|
||||
self._running = False
|
||||
self._thread: threading.Thread | None = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
# Callback for push-based updates
|
||||
self._on_data_callback = None
|
||||
|
||||
def set_on_data(self, callback):
|
||||
"""Set callback for new LTE data. Called with data dict."""
|
||||
self._on_data_callback = callback
|
||||
|
||||
@property
|
||||
def connected(self) -> bool:
|
||||
"""Whether the modem is reachable via mmcli."""
|
||||
return self._connected
|
||||
|
||||
def get_latest(self) -> dict[str, Any]:
|
||||
"""Get most recent LTE status."""
|
||||
with self._lock:
|
||||
return self._latest.copy() if self._latest else {"error": "no data"}
|
||||
|
||||
def start(self):
|
||||
"""Start background LTE poller thread."""
|
||||
if self._running:
|
||||
return
|
||||
self._running = True
|
||||
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
|
||||
self._thread.start()
|
||||
print("[LTE] Service started")
|
||||
|
||||
def stop(self):
|
||||
"""Stop background poller."""
|
||||
self._running = False
|
||||
if self._thread:
|
||||
self._thread.join(timeout=2.0)
|
||||
|
||||
def _poll_loop(self):
|
||||
"""Main poll loop."""
|
||||
print("[LTE] Poller thread running")
|
||||
while self._running:
|
||||
try:
|
||||
if _LTE_DEBUG:
|
||||
data = self._stub_poll()
|
||||
else:
|
||||
data = self._real_poll()
|
||||
|
||||
with self._lock:
|
||||
self._latest = data
|
||||
|
||||
if self._on_data_callback:
|
||||
self._on_data_callback(data)
|
||||
|
||||
except Exception as e:
|
||||
print(f"[LTE] Poll error: {e}")
|
||||
self._connected = False
|
||||
|
||||
time.sleep(self.poll_interval)
|
||||
|
||||
def _real_poll(self) -> dict[str, Any]:
|
||||
"""Poll mmcli -m 0 and parse output."""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["mmcli", "-m", "0"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=5.0,
|
||||
)
|
||||
|
||||
if result.returncode != 0:
|
||||
self._connected = False
|
||||
return {
|
||||
"connected": False,
|
||||
"signal": 0,
|
||||
"operator": None,
|
||||
"access_tech": None,
|
||||
}
|
||||
|
||||
output = result.stdout
|
||||
self._connected = True
|
||||
|
||||
# Parse signal quality: "signal quality: 68 (recent)"
|
||||
signal = 0
|
||||
m = re.search(r"signal quality:\s*(\d+)", output)
|
||||
if m:
|
||||
signal = int(m.group(1))
|
||||
|
||||
# Parse state: "state: connected" / "registered" / "searching" etc.
|
||||
state = None
|
||||
m = re.search(r"^\s*state:\s*(\S+)", output, re.MULTILINE)
|
||||
if m:
|
||||
state = m.group(1).strip("'\"")
|
||||
|
||||
# Parse operator: "operator name: KDDI KDDI"
|
||||
operator = None
|
||||
m = re.search(r"operator name:\s*(.+)", output)
|
||||
if m:
|
||||
operator = m.group(1).strip()
|
||||
|
||||
# Parse access tech: "access tech: lte"
|
||||
access_tech = None
|
||||
m = re.search(r"access tech:\s*(\S+)", output)
|
||||
if m:
|
||||
access_tech = m.group(1).strip("'\"")
|
||||
|
||||
network_connected = state in ("connected", "registered")
|
||||
|
||||
return {
|
||||
"connected": network_connected,
|
||||
"signal": signal,
|
||||
"operator": operator,
|
||||
"access_tech": access_tech,
|
||||
}
|
||||
|
||||
except subprocess.TimeoutExpired:
|
||||
print("[LTE] mmcli timed out")
|
||||
self._connected = False
|
||||
return {
|
||||
"connected": False,
|
||||
"signal": 0,
|
||||
"operator": None,
|
||||
"access_tech": None,
|
||||
}
|
||||
except FileNotFoundError:
|
||||
print("[LTE] mmcli not found, falling back to stub mode")
|
||||
self._connected = False
|
||||
return self._stub_poll()
|
||||
|
||||
# Stub state lives across polls
|
||||
_stub_signal: float = 70.0
|
||||
|
||||
def _stub_poll(self) -> dict[str, Any]:
|
||||
"""Generate mock LTE data for development.
|
||||
|
||||
Simulates connected state with signal wandering 60-80.
|
||||
"""
|
||||
self._connected = True
|
||||
|
||||
# Random walk signal, clamped to 60-80
|
||||
self._stub_signal += random.uniform(-3, 3)
|
||||
self._stub_signal = max(60, min(80, self._stub_signal))
|
||||
|
||||
return {
|
||||
"connected": True,
|
||||
"signal": int(self._stub_signal),
|
||||
"operator": "STUB",
|
||||
"access_tech": "lte",
|
||||
}
|
||||
Reference in New Issue
Block a user