ui/backend: theme switch using GPIO
This commit is contained in:
92
pi/backend/gpio_service.py
Normal file
92
pi/backend/gpio_service.py
Normal file
@@ -0,0 +1,92 @@
|
||||
"""GPIO service for Pi Zero - edge-triggered monitoring.
|
||||
|
||||
Polls GPIO pins and exposes state changes for inclusion in other payloads.
|
||||
Keeps UART separate (handled by arduino_service).
|
||||
"""
|
||||
|
||||
import gevent
|
||||
|
||||
try:
|
||||
import RPi.GPIO as GPIO
|
||||
_GPIO_AVAILABLE = True
|
||||
except ImportError:
|
||||
_GPIO_AVAILABLE = False
|
||||
print("[GPIO] RPi.GPIO not available - running in mock mode")
|
||||
|
||||
|
||||
# Pin assignments
|
||||
PIN_THEME_SWITCH = 20 # Physical switch for light/dark theme
|
||||
|
||||
|
||||
class GPIOService:
|
||||
"""Monitors GPIO pins and tracks state changes."""
|
||||
|
||||
def __init__(self):
|
||||
self._running = False
|
||||
self._greenlet = None
|
||||
|
||||
# Theme switch state
|
||||
self._theme_switch_state = False # False = light, True = dark
|
||||
self._theme_switch_pending = None # None = no change, bool = new value
|
||||
|
||||
if _GPIO_AVAILABLE:
|
||||
GPIO.setmode(GPIO.BCM)
|
||||
GPIO.setwarnings(False)
|
||||
# Input with software pull-down (belt + suspenders with hardware pulldown)
|
||||
GPIO.setup(PIN_THEME_SWITCH, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
|
||||
|
||||
def start(self):
|
||||
"""Start background polling."""
|
||||
if self._running:
|
||||
return
|
||||
self._running = True
|
||||
|
||||
# Read initial state
|
||||
if _GPIO_AVAILABLE:
|
||||
self._theme_switch_state = GPIO.input(PIN_THEME_SWITCH) == GPIO.HIGH
|
||||
else:
|
||||
self._theme_switch_state = True # Mock: default dark
|
||||
|
||||
self._greenlet = gevent.spawn(self._poll_loop)
|
||||
print(f"[GPIO] Started, theme_switch initial={self._theme_switch_state}")
|
||||
|
||||
def stop(self):
|
||||
"""Stop background polling."""
|
||||
self._running = False
|
||||
if self._greenlet:
|
||||
self._greenlet.kill()
|
||||
self._greenlet = None
|
||||
if _GPIO_AVAILABLE:
|
||||
GPIO.cleanup([PIN_THEME_SWITCH])
|
||||
|
||||
def _poll_loop(self):
|
||||
"""Poll GPIO at ~20Hz, detect edges."""
|
||||
while self._running:
|
||||
gevent.sleep(0.05) # 20Hz
|
||||
|
||||
if _GPIO_AVAILABLE:
|
||||
current = GPIO.input(PIN_THEME_SWITCH) == GPIO.HIGH
|
||||
else:
|
||||
current = self._theme_switch_state # Mock: no change
|
||||
|
||||
if current != self._theme_switch_state:
|
||||
self._theme_switch_state = current
|
||||
self._theme_switch_pending = current
|
||||
print(f"[GPIO] Theme switch changed to {current}")
|
||||
|
||||
def get_theme_switch_change(self):
|
||||
"""Get pending theme switch change, if any.
|
||||
|
||||
Returns:
|
||||
bool or None: New state if changed since last call, None otherwise.
|
||||
"""
|
||||
if self._theme_switch_pending is not None:
|
||||
value = self._theme_switch_pending
|
||||
self._theme_switch_pending = None
|
||||
return value
|
||||
return None
|
||||
|
||||
@property
|
||||
def theme_switch(self):
|
||||
"""Current theme switch state (True = dark, False = light)."""
|
||||
return self._theme_switch_state
|
||||
@@ -8,6 +8,7 @@ from flask_socketio import SocketIO, emit
|
||||
|
||||
from gps_service import GPSService
|
||||
from arduino_service import ArduinoService
|
||||
from gpio_service import GPIOService
|
||||
from throttle import Throttle
|
||||
|
||||
app = Flask(__name__)
|
||||
@@ -19,6 +20,7 @@ socketio = SocketIO(app, async_mode="gevent", cors_allowed_origins="*")
|
||||
# Services
|
||||
gps = GPSService()
|
||||
arduino = ArduinoService()
|
||||
gpio = GPIOService()
|
||||
|
||||
# Throttles for emission rate limiting (20Hz for arduino, 1Hz for GPS)
|
||||
arduino_throttle = Throttle(min_interval=0.05) # 20Hz max
|
||||
@@ -43,6 +45,7 @@ def handle_connect():
|
||||
emit("status", {
|
||||
"gps_connected": gps.connected,
|
||||
"arduino_connected": arduino.connected,
|
||||
"theme_switch": gpio.theme_switch,
|
||||
})
|
||||
|
||||
# Send latest data if available
|
||||
@@ -126,6 +129,12 @@ def handle_emergency(data):
|
||||
|
||||
def on_arduino_data(data):
|
||||
"""Called by ArduinoService when new telemetry arrives."""
|
||||
# Check for GPIO state changes to piggyback on this emit
|
||||
theme_change = gpio.get_theme_switch_change()
|
||||
if theme_change is not None:
|
||||
data = dict(data) # Don't mutate original
|
||||
data["theme_switch"] = theme_change
|
||||
|
||||
def emit_fn(d):
|
||||
socketio.emit("arduino", d)
|
||||
|
||||
@@ -219,6 +228,7 @@ def main():
|
||||
# Start services
|
||||
gps.start()
|
||||
arduino.start()
|
||||
gpio.start()
|
||||
|
||||
# Start throttle flusher in background
|
||||
socketio.start_background_task(throttle_flusher)
|
||||
@@ -230,6 +240,7 @@ def main():
|
||||
finally:
|
||||
arduino.stop()
|
||||
gps.stop()
|
||||
gpio.stop()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
Reference in New Issue
Block a user