"""Olimex ESP32-C6-EVB board driver. Hardware -------- - 4 relays (outputs) - 4 digital inputs - HTTP REST API served directly by the board on port 80 API endpoints ------------- POST /relay/on?relay= → {"state": true} POST /relay/off?relay= → {"state": false} POST /relay/toggle?relay= → {"state": } GET /relay/status?relay= → {"state": } GET /input/status?input= → {"state": } POST /register?callback_url= → {"status": "ok"} Webhook (board → server) ------------------------ The board POSTs to the registered callback_url whenever an input changes: POST {"input": , "state": } """ from __future__ import annotations import logging import requests from typing import TYPE_CHECKING from app.drivers.base import BoardDriver if TYPE_CHECKING: from app.models.board import Board logger = logging.getLogger(__name__) _TIMEOUT = 3 def _get(url: str) -> dict | None: try: r = requests.get(url, timeout=_TIMEOUT) r.raise_for_status() return r.json() except Exception as exc: logger.debug("GET %s → %s", url, exc) return None def _post(url: str) -> dict | None: try: r = requests.post(url, timeout=_TIMEOUT) r.raise_for_status() return r.json() except Exception as exc: logger.debug("POST %s → %s", url, exc) return None class OlimexESP32C6EVBDriver(BoardDriver): """Driver for the Olimex ESP32-C6-EVB board.""" DRIVER_ID = "olimex_esp32_c6_evb" DISPLAY_NAME = "Olimex ESP32-C6-EVB" DESCRIPTION = "4 relays · 4 digital inputs · HTTP REST + webhook callbacks" DEFAULT_NUM_RELAYS = 4 DEFAULT_NUM_INPUTS = 4 FIRMWARE_URL = "https://github.com/OLIMEX/ESP32-C6-EVB" # ── relay control ───────────────────────────────────────────────────────── def get_relay_status(self, board: "Board", relay_num: int) -> bool | None: data = _get(f"{board.base_url}/relay/status?relay={relay_num}") return bool(data["state"]) if data is not None else None def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool: action = "on" if state else "off" return _post(f"{board.base_url}/relay/{action}?relay={relay_num}") is not None def toggle_relay(self, board: "Board", relay_num: int) -> bool | None: data = _post(f"{board.base_url}/relay/toggle?relay={relay_num}") return bool(data["state"]) if data is not None else None # ── poll ────────────────────────────────────────────────────────────────── def poll(self, board: "Board") -> dict: _offline = {"relay_states": {}, "input_states": {}, "is_online": False} # ── Connectivity probe ───────────────────────────────────────────── # Try the very first endpoint. If the board doesn't reply we know # it's offline and skip all remaining requests (saving N × 3 s worth # of connect timeouts on every recheck cycle). relay_states: dict = {} input_states: dict = {} if board.num_relays > 0: probe = _get(f"{board.base_url}/relay/status?relay=1") if probe is None: return _offline relay_states["relay_1"] = bool(probe.get("state", False)) elif board.num_inputs > 0: probe = _get(f"{board.base_url}/input/status?input=1") if probe is None: return _offline input_states["input_1"] = bool(probe.get("state", False)) else: return _offline # Board is reachable — collect remaining endpoints for n in range(2, board.num_relays + 1): data = _get(f"{board.base_url}/relay/status?relay={n}") if data is not None: relay_states[f"relay_{n}"] = bool(data.get("state", False)) input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1 for n in range(input_start, board.num_inputs + 1): data = _get(f"{board.base_url}/input/status?input={n}") if data is not None: input_states[f"input_{n}"] = bool(data.get("state", True)) return { "relay_states": relay_states, "input_states": input_states, "is_online": True, } # ── webhook registration ────────────────────────────────────────────────── def register_webhook(self, board: "Board", callback_url: str) -> bool: url = f"{board.base_url}/register?callback_url={callback_url}" ok = _post(url) is not None if ok: logger.info("Webhook registered on board '%s' → %s", board.name, callback_url) else: logger.warning("Webhook registration failed for board '%s'", board.name) return ok