"""Generic ESP32 board driver (custom firmware). This is a template driver for custom ESP32 firmware. Copy this folder, rename it, and implement the HTTP endpoints to match your own firmware's API. Expected firmware endpoints (same shape as Olimex by default): POST /relay/on?relay= POST /relay/off?relay= POST /relay/toggle?relay= GET /relay/status?relay= → {"state": } GET /input/status?input= → {"state": } POST /register?callback_url= """ from __future__ import annotations import logging import requests from typing import TYPE_CHECKING from app.drivers.base import BoardDriver if TYPE_CHECKING: from app.models.board import Board logger = logging.getLogger(__name__) _TIMEOUT = 3 def _get(url): try: r = requests.get(url, timeout=_TIMEOUT) r.raise_for_status() return r.json() except Exception as exc: logger.debug("GET %s → %s", url, exc) return None def _post(url): try: r = requests.post(url, timeout=_TIMEOUT) r.raise_for_status() return r.json() except Exception as exc: logger.debug("POST %s → %s", url, exc) return None class GenericESP32Driver(BoardDriver): """Generic ESP32 driver — uses the same REST conventions as the Olimex board. Customise the endpoint paths below to match your firmware.""" DRIVER_ID = "generic_esp32" DISPLAY_NAME = "Generic ESP32" DESCRIPTION = "Custom ESP32 firmware · same REST API shape as Olimex" DEFAULT_NUM_RELAYS = 4 DEFAULT_NUM_INPUTS = 4 def get_relay_status(self, board: "Board", relay_num: int) -> bool | None: data = _get(f"{board.base_url}/relay/status?relay={relay_num}") return bool(data["state"]) if data is not None else None def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool: action = "on" if state else "off" return _post(f"{board.base_url}/relay/{action}?relay={relay_num}") is not None def toggle_relay(self, board: "Board", relay_num: int) -> bool | None: data = _post(f"{board.base_url}/relay/toggle?relay={relay_num}") return bool(data["state"]) if data is not None else None def poll(self, board: "Board") -> dict: relay_states, input_states, online = {}, {}, False for n in range(1, board.num_relays + 1): data = _get(f"{board.base_url}/relay/status?relay={n}") if data is not None: relay_states[f"relay_{n}"] = bool(data.get("state", False)) online = True for n in range(1, board.num_inputs + 1): data = _get(f"{board.base_url}/input/status?input={n}") if data is not None: input_states[f"input_{n}"] = bool(data.get("state", False)) online = True return {"relay_states": relay_states, "input_states": input_states, "is_online": online} def register_webhook(self, board: "Board", callback_url: str) -> bool: return _post(f"{board.base_url}/register?callback_url={callback_url}") is not None