"""Generic ESP8266 board driver (custom firmware). Lighter board — defaults to 2 relays and 2 inputs. Adjust DEFAULT_NUM_RELAYS / DEFAULT_NUM_INPUTS to match your hardware. """ from __future__ import annotations import logging import requests from typing import TYPE_CHECKING from app.drivers.base import BoardDriver if TYPE_CHECKING: from app.models.board import Board logger = logging.getLogger(__name__) _TIMEOUT = 3 def _get(url): try: r = requests.get(url, timeout=_TIMEOUT) r.raise_for_status() return r.json() except Exception as exc: logger.debug("GET %s → %s", url, exc) return None def _post(url): try: r = requests.post(url, timeout=_TIMEOUT) r.raise_for_status() return r.json() except Exception as exc: logger.debug("POST %s → %s", url, exc) return None class GenericESP8266Driver(BoardDriver): """Generic ESP8266 driver — lighter board variant.""" DRIVER_ID = "generic_esp8266" DISPLAY_NAME = "Generic ESP8266" DESCRIPTION = "Custom ESP8266 firmware · 2 relays · 2 inputs (configurable)" DEFAULT_NUM_RELAYS = 2 DEFAULT_NUM_INPUTS = 2 def get_relay_status(self, board: "Board", relay_num: int) -> bool | None: data = _get(f"{board.base_url}/relay/status?relay={relay_num}") return bool(data["state"]) if data is not None else None def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool: action = "on" if state else "off" return _post(f"{board.base_url}/relay/{action}?relay={relay_num}") is not None def toggle_relay(self, board: "Board", relay_num: int) -> bool | None: data = _post(f"{board.base_url}/relay/toggle?relay={relay_num}") return bool(data["state"]) if data is not None else None def poll(self, board: "Board") -> dict: _offline = {"relay_states": {}, "input_states": {}, "is_online": False} relay_states, input_states = {}, {} # Fail-fast probe — bail immediately if board is unreachable if board.num_relays > 0: probe = _get(f"{board.base_url}/relay/status?relay=1") if probe is None: return _offline relay_states["relay_1"] = bool(probe.get("state", False)) elif board.num_inputs > 0: probe = _get(f"{board.base_url}/input/status?input=1") if probe is None: return _offline input_states["input_1"] = bool(probe.get("state", False)) else: return _offline for n in range(2, board.num_relays + 1): data = _get(f"{board.base_url}/relay/status?relay={n}") if data is not None: relay_states[f"relay_{n}"] = bool(data.get("state", False)) input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1 for n in range(input_start, board.num_inputs + 1): data = _get(f"{board.base_url}/input/status?input={n}") if data is not None: input_states[f"input_{n}"] = bool(data.get("state", False)) return {"relay_states": relay_states, "input_states": input_states, "is_online": True} def register_webhook(self, board: "Board", callback_url: str) -> bool: return _post(f"{board.base_url}/register?callback_url={callback_url}") is not None