gps: debug stub mode with satellites field and signal loss simulation

- _GPS_DEBUG flag for development without hardware
- stub mode: realistic mock data with occasional signal loss
- satellites field in backend and UI data models
- periodic status logging

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
Mikkeli Matlock
2026-02-08 03:04:44 +09:00
parent 9173c3b93a
commit 896ba322c0
3 changed files with 147 additions and 20 deletions

View File

@@ -1,10 +1,18 @@
"""GPS service - connects to gpsd, buffers data, handles reconnection.""" """GPS service - connects to gpsd, buffers data, handles reconnection."""
import random
import threading import threading
import time import time
from collections import deque from collections import deque
from typing import Any from typing import Any
# ============================================================================
# DEBUG MODE - Set True for development without GPS hardware
# When True: skips gpsd entirely, generates realistic mock data
# When False: connects to real gpsd (requires GPS device)
# ============================================================================
_GPS_DEBUG = True
# gpsdclient is a modern, simple gpsd client # gpsdclient is a modern, simple gpsd client
# Install gpsd on Pi: sudo apt install gpsd gpsd-clients # Install gpsd on Pi: sudo apt install gpsd gpsd-clients
# Configure: sudo nano /etc/default/gpsd (set DEVICES="/dev/ttyUSB0" or similar) # Configure: sudo nano /etc/default/gpsd (set DEVICES="/dev/ttyUSB0" or similar)
@@ -32,6 +40,10 @@ class GPSService:
# Callback for push-based updates # Callback for push-based updates
self._on_data_callback = None self._on_data_callback = None
# Periodic status logging
self._last_status_log = 0.0
self._fix_count = 0
def set_on_data(self, callback): def set_on_data(self, callback):
"""Set callback for new GPS fix. Called with fix dict.""" """Set callback for new GPS fix. Called with fix dict."""
self._on_data_callback = callback self._on_data_callback = callback
@@ -57,6 +69,7 @@ class GPSService:
self._running = True self._running = True
self._thread = threading.Thread(target=self._reader_loop, daemon=True) self._thread = threading.Thread(target=self._reader_loop, daemon=True)
self._thread.start() self._thread.start()
print("[GPS] Service started")
def stop(self): def stop(self):
"""Stop background reader.""" """Stop background reader."""
@@ -66,6 +79,7 @@ class GPSService:
def _reader_loop(self): def _reader_loop(self):
"""Main reader loop with reconnection logic.""" """Main reader loop with reconnection logic."""
print("[GPS] Reader thread running")
while self._running: while self._running:
try: try:
self._connect_and_read() self._connect_and_read()
@@ -76,23 +90,40 @@ class GPSService:
def _connect_and_read(self): def _connect_and_read(self):
"""Connect to gpsd and read data.""" """Connect to gpsd and read data."""
# Debug mode: skip gpsd entirely, use stub data
if _GPS_DEBUG:
print("[GPS] Debug mode enabled, using stub data")
self._stub_mode()
return
if GPSDClient is None: if GPSDClient is None:
# Stub mode - no gpsd client installed
print("[GPS] gpsdclient not installed, running in stub mode") print("[GPS] gpsdclient not installed, running in stub mode")
self._stub_mode() self._stub_mode()
return return
# Quick check if gpsd is reachable before attempting connection
import socket
try:
sock = socket.create_connection((self.host, self.port), timeout=2.0)
sock.close()
except (socket.timeout, socket.error, OSError) as e:
print(f"[GPS] gpsd not reachable at {self.host}:{self.port}: {e}")
raise ConnectionError(f"gpsd not reachable: {e}")
try: try:
client = GPSDClient(host=self.host, port=self.port) client = GPSDClient(host=self.host, port=self.port)
except Exception as e: except Exception as e:
print(f"[GPS] Cannot connect to gpsd at {self.host}:{self.port}: {e}, falling back to stub mode") print(f"[GPS] Cannot connect to gpsd at {self.host}:{self.port}: {e}")
self._stub_mode() raise ConnectionError(f"gpsd connection failed: {e}")
return
with client: with client:
self._connected = True self._connected = True
print(f"[GPS] Connected to gpsd at {self.host}:{self.port}") print(f"[GPS] Connected to gpsd at {self.host}:{self.port}")
self._last_status_log = time.time()
self._fix_count = 0
first_fix_timeout = time.time() + 5.0 # 5s to get first fix
for result in client.dict_stream(filter=["TPV"]): for result in client.dict_stream(filter=["TPV"]):
if not self._running: if not self._running:
break break
@@ -106,8 +137,20 @@ class GPSService:
"speed": result.get("speed"), # m/s "speed": result.get("speed"), # m/s
"track": result.get("track"), # heading in degrees "track": result.get("track"), # heading in degrees
"mode": result.get("mode"), # 0=no fix, 2=2D, 3=3D "mode": result.get("mode"), # 0=no fix, 2=2D, 3=3D
"satellites": result.get("satellites"), # from SKY messages
} }
# Check if this is a real fix (has position) or just empty TPV
if fix.get("lat") is None and fix.get("mode") in (None, 0, 1):
# No real data yet, check timeout
if time.time() > first_fix_timeout:
print("[GPS] No GPS fix after 5s, will retry connection")
raise ConnectionError("No GPS fix within timeout")
continue # Skip empty fixes
# Got real data, disable timeout
first_fix_timeout = float('inf')
with self._lock: with self._lock:
self._latest = fix self._latest = fix
if fix.get("lat") is not None: if fix.get("lat") is not None:
@@ -117,27 +160,110 @@ class GPSService:
if self._on_data_callback: if self._on_data_callback:
self._on_data_callback(fix) self._on_data_callback(fix)
# Periodic status log (every 5s)
self._fix_count += 1
now = time.time()
if now - self._last_status_log >= 5.0:
elapsed = now - self._last_status_log
fps = self._fix_count / elapsed
speed = fix.get('speed', 0) or 0
track = fix.get('track', 0) or 0
mode = fix.get('mode', 0) or 0
sats = fix.get('satellites', '?')
print(f"[GPS] {fps:.1f} fix/s | {speed:.1f}m/s hdg={track:.0f}° mode={mode} sats={sats}")
self._last_status_log = now
self._fix_count = 0
def _stub_mode(self): def _stub_mode(self):
"""Fake data for testing without gpsd.""" """Generate realistic mock GPS data for development/testing.
import random
Simulates:
- Normal 3D fix with satellites
- Occasional signal loss (~3% chance per second, lasts ~5s)
- Wandering position near Tokyo
"""
self._last_status_log = time.time()
self._fix_count = 0
# Signal loss state
signal_lost = False
signal_lost_until = 0.0
# Base position (Tokyo area)
base_lat = 35.6762
base_lon = 139.6503
base_alt = 40.0
# Smoothly varying heading/speed
heading = random.uniform(0, 360)
speed = random.uniform(5, 15)
while self._running: while self._running:
self._connected = True self._connected = True
fix = { now = time.time()
"time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"lat": 35.6762 + random.uniform(-0.001, 0.001), # Check for signal loss simulation
"lon": 139.6503 + random.uniform(-0.001, 0.001), if signal_lost:
"alt": 40.0 + random.uniform(-5, 5), if now >= signal_lost_until:
"speed": random.uniform(0, 30), signal_lost = False
"track": random.uniform(0, 360), print("[GPS] Signal recovered (stub)")
"mode": 3, else:
} # ~30% chance per second to lose signal
if random.random() < 0.3:
signal_lost = True
signal_lost_until = now + 2 # fixed 2s loss
print("[GPS] Signal loss simulation (stub)")
if signal_lost:
# No fix - mode 1, no satellites, no track
# Note: use None, not float('nan') - NaN doesn't serialize to valid JSON
fix = {
"time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"lat": None,
"lon": None,
"alt": None,
"speed": None,
"track": None,
"mode": 1,
"satellites": 0,
}
else:
# Smoothly vary heading and speed
heading = (heading + random.uniform(1, 3)) % 360
speed = max(0, min(30, speed + random.uniform(-2, 2)))
fix = {
"time": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
"lat": base_lat + random.uniform(-0.001, 0.001),
"lon": base_lon + random.uniform(-0.001, 0.001),
"alt": base_alt + random.uniform(-5, 5),
"speed": speed,
"track": heading,
"mode": 3,
"satellites": random.randint(6, 12),
}
with self._lock: with self._lock:
self._latest = fix self._latest = fix
self._buffer.append(fix) if fix.get("lat") is not None:
self._buffer.append(fix)
# Invoke callback with new fix # Invoke callback with new fix
if self._on_data_callback: if self._on_data_callback:
self._on_data_callback(fix) self._on_data_callback(fix)
# Periodic status log (every 5s)
self._fix_count += 1
if now - self._last_status_log >= 5.0:
elapsed = now - self._last_status_log
fps = self._fix_count / elapsed
speed_val = fix.get('speed') or 0
track_val = fix.get('track')
track_str = f"{track_val:.0f}" if track_val is not None else "---"
mode = fix.get('mode', 0)
sats = fix.get('satellites', 0)
print(f"[GPS] {fps:.1f} fix/s | {speed_val:.1f}m/s hdg={track_str} mode={mode} sats={sats} (stub)")
self._last_status_log = now
self._fix_count = 0
time.sleep(1) time.sleep(1)

View File

@@ -108,8 +108,7 @@ class _DashboardScreenState extends State<DashboardScreen> {
setState(() { setState(() {
_gpsSpeed = data.speed; _gpsSpeed = data.speed;
_gpsTrack = data.track; _gpsTrack = data.track;
// Derive satellites from mode (placeholder logic) _gpsSatellites = data.satellites;
_gpsSatellites = data.mode == 3 ? 8 : (data.mode == 2 ? 4 : 0);
}); });
}); });
@@ -144,7 +143,7 @@ class _DashboardScreenState extends State<DashboardScreen> {
if (cachedGps != null) { if (cachedGps != null) {
_gpsSpeed = cachedGps.speed; _gpsSpeed = cachedGps.speed;
_gpsTrack = cachedGps.track; _gpsTrack = cachedGps.track;
_gpsSatellites = cachedGps.mode == 3 ? 8 : (cachedGps.mode == 2 ? 4 : 0); _gpsSatellites = cachedGps.satellites;
} }
_wsState = WebSocketService.instance.connectionState; _wsState = WebSocketService.instance.connectionState;

View File

@@ -39,8 +39,9 @@ class GpsData {
final double? alt; final double? alt;
final double? track; final double? track;
final int? mode; // 0=no fix, 2=2D, 3=3D final int? mode; // 0=no fix, 2=2D, 3=3D
final int? satellites;
GpsData({this.lat, this.lon, this.speed, this.alt, this.track, this.mode}); GpsData({this.lat, this.lon, this.speed, this.alt, this.track, this.mode, this.satellites});
factory GpsData.fromJson(Map<String, dynamic> json) { factory GpsData.fromJson(Map<String, dynamic> json) {
return GpsData( return GpsData(
@@ -50,6 +51,7 @@ class GpsData {
alt: (json['alt'] as num?)?.toDouble(), alt: (json['alt'] as num?)?.toDouble(),
track: (json['track'] as num?)?.toDouble(), track: (json['track'] as num?)?.toDouble(),
mode: (json['mode'] as num?)?.toInt(), mode: (json['mode'] as num?)?.toInt(),
satellites: (json['satellites'] as num?)?.toInt(),
); );
} }
} }