gps manipulations tailored to sim7600h hat

This commit is contained in:
2026-02-09 02:11:55 +09:00
parent 992270ed00
commit 629c735eec
20 changed files with 2503 additions and 2355 deletions

View File

@@ -1,61 +1,61 @@
"""Throttle layer for rate-limiting telemetry emissions."""
import time
from typing import Any, Callable
class Throttle:
"""Rate limiter for WebSocket emissions.
Coalesces rapid updates - only emits at most once per min_interval.
If multiple updates arrive within the interval, the latest value wins.
"""
def __init__(self, min_interval: float = 0.5):
"""
Args:
min_interval: Minimum seconds between emissions (default 0.5 = 2Hz max)
"""
self._last_emit: float = 0
self._min_interval = min_interval
self._pending: Any = None
def maybe_emit(self, data: Any, emit_fn: Callable[[Any], None]) -> bool:
"""Emit if interval has passed, otherwise store as pending.
Args:
data: Data to emit
emit_fn: Function to call with data when emitting
Returns:
True if emitted, False if stored as pending
"""
now = time.time()
if now - self._last_emit >= self._min_interval:
emit_fn(data)
self._last_emit = now
self._pending = None
return True
else:
self._pending = data # Latest value wins
return False
def flush(self, emit_fn: Callable[[Any], None]) -> bool:
"""Emit pending data if any.
Call this periodically to ensure pending data gets sent.
Returns:
True if pending data was emitted, False if nothing pending
"""
if self._pending is not None:
emit_fn(self._pending)
self._last_emit = time.time()
self._pending = None
return True
return False
@property
def has_pending(self) -> bool:
"""Check if there's pending data waiting to be emitted."""
return self._pending is not None
"""Throttle layer for rate-limiting telemetry emissions."""
import time
from typing import Any, Callable
class Throttle:
"""Rate limiter for WebSocket emissions.
Coalesces rapid updates - only emits at most once per min_interval.
If multiple updates arrive within the interval, the latest value wins.
"""
def __init__(self, min_interval: float = 0.5):
"""
Args:
min_interval: Minimum seconds between emissions (default 0.5 = 2Hz max)
"""
self._last_emit: float = 0
self._min_interval = min_interval
self._pending: Any = None
def maybe_emit(self, data: Any, emit_fn: Callable[[Any], None]) -> bool:
"""Emit if interval has passed, otherwise store as pending.
Args:
data: Data to emit
emit_fn: Function to call with data when emitting
Returns:
True if emitted, False if stored as pending
"""
now = time.time()
if now - self._last_emit >= self._min_interval:
emit_fn(data)
self._last_emit = now
self._pending = None
return True
else:
self._pending = data # Latest value wins
return False
def flush(self, emit_fn: Callable[[Any], None]) -> bool:
"""Emit pending data if any.
Call this periodically to ensure pending data gets sent.
Returns:
True if pending data was emitted, False if nothing pending
"""
if self._pending is not None:
emit_fn(self._pending)
self._last_emit = time.time()
self._pending = None
return True
return False
@property
def has_pending(self) -> bool:
"""Check if there's pending data waiting to be emitted."""
return self._pending is not None