Files
location_managemet/app/drivers/olimex_esp32_c6_evb/driver.py
2026-02-26 19:24:17 +02:00

118 lines
4.2 KiB
Python

"""Olimex ESP32-C6-EVB board driver.
Hardware
--------
- 4 relays (outputs)
- 4 digital inputs
- HTTP REST API served directly by the board on port 80
API endpoints
-------------
POST /relay/on?relay=<n> → {"state": true}
POST /relay/off?relay=<n> → {"state": false}
POST /relay/toggle?relay=<n> → {"state": <new>}
GET /relay/status?relay=<n> → {"state": <bool>}
GET /input/status?input=<n> → {"state": <bool>}
POST /register?callback_url=<url> → {"status": "ok"}
Webhook (board → server)
------------------------
The board POSTs to the registered callback_url whenever an input changes:
POST <callback_url>
{"input": <n>, "state": <bool>}
"""
from __future__ import annotations
import logging
import requests
from typing import TYPE_CHECKING
from app.drivers.base import BoardDriver
if TYPE_CHECKING:
from app.models.board import Board
logger = logging.getLogger(__name__)
_TIMEOUT = 3
def _get(url: str) -> dict | None:
try:
r = requests.get(url, timeout=_TIMEOUT)
r.raise_for_status()
return r.json()
except Exception as exc:
logger.debug("GET %s%s", url, exc)
return None
def _post(url: str) -> dict | None:
try:
r = requests.post(url, timeout=_TIMEOUT)
r.raise_for_status()
return r.json()
except Exception as exc:
logger.debug("POST %s%s", url, exc)
return None
class OlimexESP32C6EVBDriver(BoardDriver):
"""Driver for the Olimex ESP32-C6-EVB board."""
DRIVER_ID = "olimex_esp32_c6_evb"
DISPLAY_NAME = "Olimex ESP32-C6-EVB"
DESCRIPTION = "4 relays · 4 digital inputs · HTTP REST + webhook callbacks"
DEFAULT_NUM_RELAYS = 4
DEFAULT_NUM_INPUTS = 4
FIRMWARE_URL = "https://github.com/OLIMEX/ESP32-C6-EVB"
# ── relay control ─────────────────────────────────────────────────────────
def get_relay_status(self, board: "Board", relay_num: int) -> bool | None:
data = _get(f"{board.base_url}/relay/status?relay={relay_num}")
return bool(data["state"]) if data is not None else None
def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool:
action = "on" if state else "off"
return _post(f"{board.base_url}/relay/{action}?relay={relay_num}") is not None
def toggle_relay(self, board: "Board", relay_num: int) -> bool | None:
data = _post(f"{board.base_url}/relay/toggle?relay={relay_num}")
return bool(data["state"]) if data is not None else None
# ── poll ──────────────────────────────────────────────────────────────────
def poll(self, board: "Board") -> dict:
relay_states: dict = {}
input_states: dict = {}
online = False
for n in range(1, board.num_relays + 1):
data = _get(f"{board.base_url}/relay/status?relay={n}")
if data is not None:
relay_states[f"relay_{n}"] = bool(data.get("state", False))
online = True
for n in range(1, board.num_inputs + 1):
data = _get(f"{board.base_url}/input/status?input={n}")
if data is not None:
input_states[f"input_{n}"] = bool(data.get("state", True))
online = True
return {
"relay_states": relay_states,
"input_states": input_states,
"is_online": online,
}
# ── webhook registration ──────────────────────────────────────────────────
def register_webhook(self, board: "Board", callback_url: str) -> bool:
url = f"{board.base_url}/register?callback_url={callback_url}"
ok = _post(url) is not None
if ok:
logger.info("Webhook registered on board '%s'%s", board.name, callback_url)
else:
logger.warning("Webhook registration failed for board '%s'", board.name)
return ok