Compare commits

..

6 Commits

Author SHA1 Message Date
Mikkeli Matlock
a46496d688 gps related minor fixes 2026-02-09 17:48:38 +09:00
Mikkeli Matlock
47b3427e63 lte service (backend) and ui handling 2026-02-09 02:36:03 +09:00
Mikkeli Matlock
12a0d58800 switched backend gps service to real mode 2026-02-09 02:35:48 +09:00
629c735eec gps manipulations tailored to sim7600h hat 2026-02-09 02:11:55 +09:00
Mikkeli Matlock
992270ed00 ideas 2026-02-08 03:20:07 +09:00
Mikkeli Matlock
83af09b47c ui: system status bar looks tweak 2026-02-08 03:20:00 +09:00
21 changed files with 2777 additions and 2362 deletions

5
IDEAS.md Normal file
View File

@@ -0,0 +1,5 @@
Note to keep inspirations lest I forget.
# Things to do, but not really urgent
- Fit OpenStreetMap somewhere and have a proper map widget in UI (not really navs, just show where I am)
- Integrate paho-mqtt into Python backend for some telemetry. Also set up mosquitto or whatnots on vps.

View File

@@ -178,6 +178,19 @@ The Pi's internal pull-down (~50kΩ) will overpower high-value external resistor
Physical switches/connectors need debouncing. Current implementation requires 15 consecutive identical readings (~750ms at 20Hz) before accepting a state change. Tune `required_consecutive` in `gpio_service.py` as needed.
## Utilities
Standalone tools live in `pi/utils/` (not part of the backend service):
| Tool | Description |
|------|-------------|
| `at_terminal.py` | Interactive AT command terminal for SIM7600 (pyserial). Default port: `/dev/ttyUSB2` |
```bash
python pi/utils/at_terminal.py # default /dev/ttyUSB2
python pi/utils/at_terminal.py /dev/ttyUSB3 # specify port
```
## Deploy
TODO: Add to `scripts/deploy.py` as second target + systemd service.

View File

@@ -11,7 +11,7 @@ from typing import Any
# When True: skips gpsd entirely, generates realistic mock data
# When False: connects to real gpsd (requires GPS device)
# ============================================================================
_GPS_DEBUG = True
_GPS_DEBUG = False
# gpsdclient is a modern, simple gpsd client
# Install gpsd on Pi: sudo apt install gpsd gpsd-clients
@@ -40,8 +40,12 @@ class GPSService:
# Callback for push-based updates
self._on_data_callback = None
# GPS state tracking (NMEA can't distinguish "acquiring" from "lost")
self._has_ever_fixed = False # True after first valid fix this session
# Periodic status logging
self._last_status_log = 0.0
self._last_state_emit = 0.0
self._fix_count = 0
def set_on_data(self, callback):
@@ -57,6 +61,17 @@ class GPSService:
with self._lock:
return self._latest.copy() if self._latest else {"error": "no data"}
def _gps_state(self, fix: dict) -> str:
"""Determine GPS state: acquiring, fix, or lost.
NMEA doesn't distinguish 'never had fix' from 'lost signal' — both
report mode 1 with no position. We track it ourselves.
"""
has_fix = fix.get("mode") in (2, 3) and fix.get("lat") is not None
if has_fix:
return "fix"
return "lost" if self._has_ever_fixed else "acquiring"
def get_buffer(self) -> list[dict[str, Any]]:
"""Get buffered GPS history."""
with self._lock:
@@ -122,7 +137,8 @@ class GPSService:
self._last_status_log = time.time()
self._fix_count = 0
first_fix_timeout = time.time() + 5.0 # 5s to get first fix
# 120s for initial cold fix, 10s for signal loss after first fix
fix_timeout = time.time() + 120.0
for result in client.dict_stream(filter=["TPV"]):
if not self._running:
@@ -140,16 +156,32 @@ class GPSService:
"satellites": result.get("satellites"), # from SKY messages
}
# Compute state and attach to fix
fix["gps_state"] = self._gps_state(fix)
# Check if this is a real fix (has position) or just empty TPV
if fix.get("lat") is None and fix.get("mode") in (None, 0, 1):
# No real data yet, check timeout
if time.time() > first_fix_timeout:
print("[GPS] No GPS fix after 5s, will retry connection")
if time.time() > fix_timeout:
timeout_s = "120s" if not self._has_ever_fixed else "10s"
print(f"[GPS] No GPS fix after {timeout_s}, will retry connection")
raise ConnectionError("No GPS fix within timeout")
# Emit state periodically so UI knows we're alive
now = time.time()
if now - self._last_state_emit >= 5.0:
self._last_state_emit = now
with self._lock:
self._latest = fix
if self._on_data_callback:
self._on_data_callback(fix)
continue # Skip empty fixes
# Got real data, disable timeout
first_fix_timeout = float('inf')
# Got real data — mark first fix, reset timeout to shorter window
if not self._has_ever_fixed:
self._has_ever_fixed = True
self._last_state_emit = 0.0 # Force immediate emit on transition
print("[GPS] First fix acquired")
fix_timeout = time.time() + 10.0 # 10s timeout for signal loss
with self._lock:
self._latest = fix
@@ -178,8 +210,9 @@ class GPSService:
"""Generate realistic mock GPS data for development/testing.
Simulates:
- Initial acquisition delay (~3s before first fix)
- Normal 3D fix with satellites
- Occasional signal loss (~3% chance per second, lasts ~5s)
- Occasional signal loss (~30% chance per second, lasts ~2s)
- Wandering position near Tokyo
"""
self._last_status_log = time.time()
@@ -189,6 +222,9 @@ class GPSService:
signal_lost = False
signal_lost_until = 0.0
# Simulate cold start acquisition (~30s)
acquiring_until = time.time() + 30.0
# Base position (Tokyo area)
base_lat = 35.6762
base_lon = 139.6503
@@ -202,16 +238,20 @@ class GPSService:
self._connected = True
now = time.time()
# Check for signal loss simulation
if signal_lost:
if now >= signal_lost_until:
signal_lost = False
# Simulate initial acquisition period
if now < acquiring_until:
signal_lost = True # No fix yet
elif signal_lost and now >= signal_lost_until:
signal_lost = False
if self._has_ever_fixed:
print("[GPS] Signal recovered (stub)")
else:
else:
print("[GPS] First fix acquired (stub)")
elif not signal_lost:
# ~30% chance per second to lose signal
if random.random() < 0.3:
signal_lost = True
signal_lost_until = now + 2 # fixed 2s loss
signal_lost_until = now + 2 # fixed 2s loss
print("[GPS] Signal loss simulation (stub)")
if signal_lost:
@@ -232,6 +272,9 @@ class GPSService:
heading = (heading + random.uniform(1, 3)) % 360
speed = max(0, min(30, speed + random.uniform(-2, 2)))
if not self._has_ever_fixed:
self._has_ever_fixed = True
fix = {
"time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"lat": base_lat + random.uniform(-0.001, 0.001),
@@ -243,6 +286,9 @@ class GPSService:
"satellites": random.randint(6, 12),
}
# Attach state — same logic as real GPS path
fix["gps_state"] = self._gps_state(fix)
with self._lock:
self._latest = fix
if fix.get("lat") is not None:

178
pi/backend/lte_service.py Normal file
View File

@@ -0,0 +1,178 @@
"""LTE service - polls ModemManager for signal quality and connection state."""
import random
import subprocess
import re
import threading
import time
from typing import Any
# ============================================================================
# DEBUG MODE - Set True for development without modem hardware
# When True: skips mmcli entirely, generates mock LTE data
# When False: polls real ModemManager via mmcli
# ============================================================================
_LTE_DEBUG = True
class LteService:
"""Threaded LTE modem status poller.
Polls `mmcli -m 0` every 5 seconds, parses signal quality, connection
state, operator, and access technology. Emits dict via callback.
No history buffer — historical signal strength isn't useful.
"""
def __init__(self, poll_interval: float = 5.0):
self.poll_interval = poll_interval
self._latest: dict[str, Any] = {}
self._connected = False # True when mmcli responds (modem alive)
self._running = False
self._thread: threading.Thread | None = None
self._lock = threading.Lock()
# Callback for push-based updates
self._on_data_callback = None
def set_on_data(self, callback):
"""Set callback for new LTE data. Called with data dict."""
self._on_data_callback = callback
@property
def connected(self) -> bool:
"""Whether the modem is reachable via mmcli."""
return self._connected
def get_latest(self) -> dict[str, Any]:
"""Get most recent LTE status."""
with self._lock:
return self._latest.copy() if self._latest else {"error": "no data"}
def start(self):
"""Start background LTE poller thread."""
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._poll_loop, daemon=True)
self._thread.start()
print("[LTE] Service started")
def stop(self):
"""Stop background poller."""
self._running = False
if self._thread:
self._thread.join(timeout=2.0)
def _poll_loop(self):
"""Main poll loop."""
print("[LTE] Poller thread running")
while self._running:
try:
if _LTE_DEBUG:
data = self._stub_poll()
else:
data = self._real_poll()
with self._lock:
self._latest = data
if self._on_data_callback:
self._on_data_callback(data)
except Exception as e:
print(f"[LTE] Poll error: {e}")
self._connected = False
time.sleep(self.poll_interval)
def _real_poll(self) -> dict[str, Any]:
"""Poll mmcli -m 0 and parse output."""
try:
result = subprocess.run(
["mmcli", "-m", "0"],
capture_output=True,
text=True,
timeout=5.0,
)
if result.returncode != 0:
self._connected = False
return {
"connected": False,
"signal": 0,
"operator": None,
"access_tech": None,
}
output = result.stdout
self._connected = True
# Parse signal quality: "signal quality: 68 (recent)"
signal = 0
m = re.search(r"signal quality:\s*(\d+)", output)
if m:
signal = int(m.group(1))
# Parse state: "state: connected" / "registered" / "searching" etc.
state = None
m = re.search(r"^\s*state:\s*(\S+)", output, re.MULTILINE)
if m:
state = m.group(1).strip("'\"")
# Parse operator: "operator name: KDDI KDDI"
operator = None
m = re.search(r"operator name:\s*(.+)", output)
if m:
operator = m.group(1).strip()
# Parse access tech: "access tech: lte"
access_tech = None
m = re.search(r"access tech:\s*(\S+)", output)
if m:
access_tech = m.group(1).strip("'\"")
network_connected = state in ("connected", "registered")
return {
"connected": network_connected,
"signal": signal,
"operator": operator,
"access_tech": access_tech,
}
except subprocess.TimeoutExpired:
print("[LTE] mmcli timed out")
self._connected = False
return {
"connected": False,
"signal": 0,
"operator": None,
"access_tech": None,
}
except FileNotFoundError:
print("[LTE] mmcli not found, falling back to stub mode")
self._connected = False
return self._stub_poll()
# Stub state lives across polls
_stub_signal: float = 70.0
def _stub_poll(self) -> dict[str, Any]:
"""Generate mock LTE data for development.
Simulates connected state with signal wandering 60-80.
"""
self._connected = True
# Random walk signal, clamped to 60-80
self._stub_signal += random.uniform(-3, 3)
self._stub_signal = max(60, min(80, self._stub_signal))
return {
"connected": True,
"signal": int(self._stub_signal),
"operator": "STUB",
"access_tech": "lte",
}

View File

@@ -9,6 +9,7 @@ from flask_socketio import SocketIO, emit
from gps_service import GPSService
from arduino_service import ArduinoService
from gpio_service import GPIOService
from lte_service import LteService
from throttle import Throttle
app = Flask(__name__)
@@ -21,10 +22,12 @@ socketio = SocketIO(app, async_mode="gevent", cors_allowed_origins="*")
gps = GPSService()
arduino = ArduinoService()
gpio = GPIOService()
lte = LteService()
# Throttles for emission rate limiting (20Hz for arduino, 1Hz for GPS)
# Throttles for emission rate limiting (20Hz for arduino, 1Hz for GPS, 5s for LTE)
arduino_throttle = Throttle(min_interval=0.05) # 20Hz max
gps_throttle = Throttle(min_interval=1.0) # 1Hz max
lte_throttle = Throttle(min_interval=5.0) # Every 5s — signal doesn't need real-time
# Track connected clients
connected_clients = set()
@@ -57,6 +60,10 @@ def handle_connect():
if "error" not in gps_data:
emit("gps", gps_data)
lte_data = lte.get_latest()
if "error" not in lte_data:
emit("lte", lte_data)
@socketio.on("disconnect")
def handle_disconnect():
@@ -133,6 +140,10 @@ def on_arduino_data(data):
data = dict(data) # Don't mutate original
data["theme_switch"] = gpio.theme_switch
# backend voltage offset correction
if "voltage" in data:
data["voltage"] += 0.2 # Calibration offset
def emit_fn(d):
socketio.emit("arduino", d)
@@ -147,6 +158,14 @@ def on_gps_data(data):
gps_throttle.maybe_emit(data, emit_fn)
def on_lte_data(data):
"""Called by LteService when new status polled."""
def emit_fn(d):
socketio.emit("lte", d)
lte_throttle.maybe_emit(data, emit_fn)
def on_arduino_ack(cmd, status, extra):
"""Called by ArduinoService when ACK received from Arduino."""
socketio.emit("ack", {
@@ -172,6 +191,9 @@ def throttle_flusher():
if gps_throttle.has_pending:
gps_throttle.flush(lambda d: socketio.emit("gps", d))
if lte_throttle.has_pending:
lte_throttle.flush(lambda d: socketio.emit("lte", d))
# -----------------------------------------------------------------------------
# REST API (backward compatibility)
@@ -180,10 +202,15 @@ def throttle_flusher():
@app.route("/health")
def health():
"""Health check endpoint."""
gps_latest = gps.get_latest()
lte_latest = lte.get_latest()
return jsonify({
"status": "ok",
"gps_connected": gps.connected,
"gps_state": gps_latest.get("gps_state", "acquiring"),
"arduino_connected": arduino.connected,
"lte_connected": lte.connected,
"lte_signal": lte_latest.get("signal"),
"ws_clients": len(connected_clients),
})
@@ -200,6 +227,12 @@ def gps_history():
return jsonify(gps.get_buffer())
@app.route("/lte")
def lte_data():
"""Current LTE modem status."""
return jsonify(lte.get_latest())
@app.route("/arduino")
def arduino_data():
"""Current Arduino telemetry (voltage, rpm, etc)."""
@@ -222,11 +255,13 @@ def main():
arduino.set_on_data(on_arduino_data)
arduino.set_on_ack(on_arduino_ack)
gps.set_on_data(on_gps_data)
lte.set_on_data(on_lte_data)
# Start services
gps.start()
arduino.start()
gpio.start()
lte.start()
# Start throttle flusher in background
socketio.start_background_task(throttle_flusher)
@@ -239,6 +274,7 @@ def main():
arduino.stop()
gps.stop()
gpio.stop()
lte.stop()
if __name__ == "__main__":

View File

@@ -0,0 +1,49 @@
"""Quick AT command terminal - minicom but less hostile.
Usage:
python at_terminal.py [port]
Default port: /dev/ttyUSB2 (SIM7600 AT command interface)
"""
import sys
import serial
import threading
PORT = sys.argv[1] if len(sys.argv) > 1 else "/dev/ttyUSB2"
BAUD = 115200
def reader(ser):
"""Background thread: print everything the modem sends."""
while True:
try:
data = ser.read(ser.in_waiting or 1)
if data:
sys.stdout.write(data.decode("utf-8", errors="replace"))
sys.stdout.flush()
except Exception:
break
def main():
print(f"Opening {PORT} @ {BAUD} baud")
print("Type AT commands. Ctrl+C to quit.\n")
ser = serial.Serial(PORT, BAUD, timeout=0.1)
t = threading.Thread(target=reader, args=(ser,), daemon=True)
t.start()
try:
while True:
line = input()
ser.write((line + "\r\n").encode())
except (KeyboardInterrupt, EOFError):
print("\nBye.")
finally:
ser.close()
if __name__ == "__main__":
main()

View File

@@ -41,6 +41,7 @@ class _AppRootState extends State<AppRoot> {
// Show all items from the start so the row doesn't jump around
_updateStatus('Config', '...');
_updateStatus('UART', '...');
_updateStatus('GPS', '...');
_updateStatus('Navigator', '...');
// Config must load first (everything else depends on it)
@@ -48,11 +49,13 @@ class _AppRootState extends State<AppRoot> {
await ConfigService.instance.load();
_updateStatus('Config', 'Ready');
// UART health check and navigator image preload run truly in parallel
// UART, GPS, and navigator image preload run truly in parallel
_updateStatus('UART', 'Connecting');
_updateStatus('GPS', 'Waiting');
_updateStatus('Navigator', 'Loading');
await Future.wait([
_waitForUart(),
_waitForGps(),
_preloadNavigatorImages(),
]);
@@ -100,6 +103,39 @@ class _AppRootState extends State<AppRoot> {
_updateStatus('UART', 'Timeout');
}
/// Poll backend health endpoint until GPS has a fix, or bail after 7.5s
Future<void> _waitForGps() async {
final backendUrl = ConfigService.instance.backendUrl;
const bailOut = Duration(milliseconds: 7500);
const retryDelay = Duration(seconds: 1);
final deadline = DateTime.now().add(bailOut);
_updateStatus('GPS', 'Acquiring');
while (DateTime.now().isBefore(deadline)) {
try {
final response = await http
.get(Uri.parse('$backendUrl/health'))
.timeout(const Duration(seconds: 2));
if (response.statusCode == 200) {
final data = jsonDecode(response.body) as Map<String, dynamic>;
if (data['gps_state'] == 'fix') {
_updateStatus('GPS', 'Ready');
return;
}
}
} catch (e) {
// Backend not reachable yet - keep trying
}
await Future.delayed(retryDelay);
}
// Bail out - dashboard will show live GPS state when it arrives
_updateStatus('GPS', 'Timeout');
}
/// Preload navigator images into Flutter's image cache
///
/// Scans for all PNGs in the navigator folder and precaches them.

View File

@@ -37,6 +37,7 @@ class _DashboardScreenState extends State<DashboardScreen> {
// WebSocket stream subscriptions
StreamSubscription<ArduinoData>? _arduinoSub;
StreamSubscription<GpsData>? _gpsSub;
StreamSubscription<LteData>? _lteSub;
StreamSubscription<WsConnectionState>? _connectionSub;
// Pi temperature - direct file read (safety critical)
@@ -60,6 +61,7 @@ class _DashboardScreenState extends State<DashboardScreen> {
// Placeholder values for system bar
int? _gpsSatellites;
String? _gpsState;
int? _lteSignal;
// WebSocket connection state
@@ -109,6 +111,14 @@ class _DashboardScreenState extends State<DashboardScreen> {
_gpsSpeed = data.speed;
_gpsTrack = data.track;
_gpsSatellites = data.satellites;
_gpsState = data.gpsState;
});
});
// Subscribe to LTE data stream
_lteSub = WebSocketService.instance.lteStream.listen((data) {
setState(() {
_lteSignal = data.signal;
});
});
@@ -144,12 +154,16 @@ class _DashboardScreenState extends State<DashboardScreen> {
_gpsSpeed = cachedGps.speed;
_gpsTrack = cachedGps.track;
_gpsSatellites = cachedGps.satellites;
_gpsState = cachedGps.gpsState;
}
_wsState = WebSocketService.instance.connectionState;
// Placeholder: LTE signal (TODO: wire up when LTE service exists)
_lteSignal = null;
// Init from cached LTE data
final cachedLte = WebSocketService.instance.latestLte;
if (cachedLte != null) {
_lteSignal = cachedLte.signal;
}
// DEBUG: flip-flop theme + navigator every 2s
TestFlipFlopService.instance.start(navigatorKey: _navigatorKey);
@@ -160,6 +174,7 @@ class _DashboardScreenState extends State<DashboardScreen> {
_piTempTimer?.cancel();
_arduinoSub?.cancel();
_gpsSub?.cancel();
_lteSub?.cancel();
_connectionSub?.cancel();
TestFlipFlopService.instance.stop();
super.dispose();
@@ -200,15 +215,14 @@ class _DashboardScreenState extends State<DashboardScreen> {
// System status bar
SystemBar(
gpsSatellites: _gpsSatellites,
gpsState: _gpsState,
lteSignal: _lteSignal,
piTemp: _piTemp,
voltage: _voltage,
wsState: _wsState,
),
const SizedBox(height: 2),
// Main content area - big stat boxes
// Main content area - big widgets
Expanded(
flex: 7,
child: Row(
@@ -239,7 +253,7 @@ class _DashboardScreenState extends State<DashboardScreen> {
mainAxisAlignment: MainAxisAlignment.spaceEvenly,
children: [
StatBox(value: _formatInt(_rpm), label: 'RPM', isWarning: () => (_rpm ?? 0) > 4000),
GpsCompass(heading: _gpsTrack),
GpsCompass(heading: _gpsTrack, gpsState: _gpsState),
StatBox(value: _formatGear(_gear), label: 'GEAR'),
],
),

View File

@@ -40,8 +40,9 @@ class GpsData {
final double? track;
final int? mode; // 0=no fix, 2=2D, 3=3D
final int? satellites;
final String? gpsState; // "acquiring", "fix", or "lost"
GpsData({this.lat, this.lon, this.speed, this.alt, this.track, this.mode, this.satellites});
GpsData({this.lat, this.lon, this.speed, this.alt, this.track, this.mode, this.satellites, this.gpsState});
factory GpsData.fromJson(Map<String, dynamic> json) {
return GpsData(
@@ -52,6 +53,26 @@ class GpsData {
track: (json['track'] as num?)?.toDouble(),
mode: (json['mode'] as num?)?.toInt(),
satellites: (json['satellites'] as num?)?.toInt(),
gpsState: json['gps_state'] as String?,
);
}
}
/// Data from LTE modem (signal quality, connection state)
class LteData {
final bool? connected;
final int? signal; // 0-100 percent
final String? operator_;
final String? accessTech;
LteData({this.connected, this.signal, this.operator_, this.accessTech});
factory LteData.fromJson(Map<String, dynamic> json) {
return LteData(
connected: json['connected'] as bool?,
signal: (json['signal'] as num?)?.toInt(),
operator_: json['operator'] as String?,
accessTech: json['access_tech'] as String?,
);
}
}

View File

@@ -65,11 +65,13 @@ class WebSocketService {
// Latest values for sync access (backward compat)
ArduinoData? _latestArduino;
GpsData? _latestGps;
LteData? _latestLte;
BackendStatus? _latestStatus;
// Stream controllers
late StreamController<ArduinoData> _arduinoController;
late StreamController<GpsData> _gpsController;
late StreamController<LteData> _lteController;
late StreamController<BackendStatus> _statusController;
late StreamController<CommandAck> _ackController;
late StreamController<BackendAlert> _alertController;
@@ -83,6 +85,7 @@ class WebSocketService {
void _setupStreams() {
_arduinoController = StreamController<ArduinoData>.broadcast();
_gpsController = StreamController<GpsData>.broadcast();
_lteController = StreamController<LteData>.broadcast();
_statusController = StreamController<BackendStatus>.broadcast();
_ackController = StreamController<CommandAck>.broadcast();
_alertController = StreamController<BackendAlert>.broadcast();
@@ -107,6 +110,9 @@ class WebSocketService {
/// Stream of GPS updates
Stream<GpsData> get gpsStream => _gpsController.stream;
/// Stream of LTE status updates
Stream<LteData> get lteStream => _lteController.stream;
/// Stream of backend status updates
Stream<BackendStatus> get statusStream => _statusController.stream;
@@ -139,6 +145,9 @@ class WebSocketService {
/// Latest GPS data (may be null if not yet received)
GpsData? get latestGps => _latestGps;
/// Latest LTE data (may be null if not yet received)
LteData? get latestLte => _latestLte;
/// Latest backend status
BackendStatus? get latestStatus => _latestStatus;
@@ -208,6 +217,15 @@ class WebSocketService {
}
});
_socket!.on('lte', (data) {
if (data is Map<String, dynamic>) {
final lte = LteData.fromJson(data);
_latestLte = lte;
_lteController.add(lte);
_log('lte: ${lte.signal ?? "-"}% ${lte.operator_ ?? "-"} ${lte.accessTech ?? "-"}');
}
});
_socket!.on('status', (data) {
if (data is Map<String, dynamic>) {
final status = BackendStatus(
@@ -323,6 +341,7 @@ class WebSocketService {
disconnect();
_arduinoController.close();
_gpsController.close();
_lteController.close();
_statusController.close();
_ackController.close();
_alertController.close();

View File

@@ -4,14 +4,16 @@ import '../theme/app_theme.dart';
class GpsCompass extends StatelessWidget {
final double? heading;
final String? gpsState; // "acquiring", "fix", "lost"
const GpsCompass({super.key, this.heading});
const GpsCompass({super.key, this.heading, this.gpsState});
bool get _hasSignal => heading != null;
bool get _isAcquiring => gpsState == 'acquiring';
String get _displayHeading {
if (!_hasSignal) return 'N/A'; // Just make it clear; redundant anyways, this only gets called when _hasSignal
return '${(heading! % 360).round()}'; // No need for the degree symbol
if (!_hasSignal) return 'N/A';
return '${(heading! % 360).round()}';
}
String get _compassDirection {
@@ -56,10 +58,10 @@ class GpsCompass extends StatelessWidget {
child: FittedBox(
fit: BoxFit.contain,
child: Text(
_hasSignal ? "${_displayHeading} ${_compassDirection}" : "N/A",
_hasSignal ? "${_displayHeading} ${_compassDirection}" : (_isAcquiring ? "ACQ" : "N/A"),
style: TextStyle(
fontSize: 80,
color: theme.subdued,
color: theme.subdued, // less emphasis on text, let the icon have semantic colour
fontFamily: 'DIN1451',
),
),

View File

@@ -7,6 +7,7 @@ import '../theme/app_theme.dart';
/// Shows GPS satellites, LTE signal, Pi temp, voltage, WS status at a glance.
class SystemBar extends StatelessWidget {
final int? gpsSatellites; // null = disconnected
final String? gpsState; // "acquiring", "fix", "lost"
final int? lteSignal; // null = disconnected, 0-4 bars
final double? piTemp; // null = unavailable
final double? voltage; // null = Arduino disconnected
@@ -15,6 +16,7 @@ class SystemBar extends StatelessWidget {
const SystemBar({
super.key,
this.gpsSatellites,
this.gpsState,
this.lteSignal,
this.piTemp,
this.voltage,
@@ -49,14 +51,6 @@ class SystemBar extends StatelessWidget {
return Container(
padding: const EdgeInsets.symmetric(horizontal: 24),
decoration: BoxDecoration(
border: Border(
bottom: BorderSide(
color: theme.subdued.withValues(alpha: 0.3),
width: 1,
),
),
),
child: Row(
crossAxisAlignment: CrossAxisAlignment.center,
children: [
@@ -73,8 +67,10 @@ class SystemBar extends StatelessWidget {
),
_Indicator(
label: 'GPS',
value: gpsSatellites?.toString() ?? 'N/A',
isAbnormal: gpsSatellites == null || gpsSatellites == 0,
value: gpsState == 'acquiring' ? 'ACQ'
: gpsState == 'fix' ? (gpsSatellites?.toString() ?? 'N/A')
: '0', // lost or unknown
isAbnormal: gpsState != 'fix' || gpsSatellites == null,
alignment: Alignment.centerLeft,
labelSize: labelSize,
valueSize: valueSize,
@@ -106,7 +102,7 @@ class SystemBar extends StatelessWidget {
_Indicator(
label: 'Mains',
value: voltage != null ? '${voltage!.toStringAsFixed(1)} V' : 'N/A',
isAbnormal: voltage == null || voltage! < 11.9,
isAbnormal: voltage == null || voltage! < 11.7 || voltage! > 14.5,
alignment: Alignment.centerLeft,
labelSize: labelSize,
valueSize: valueSize,