"""Olimex ESP32-C6-EVB + PN532 NFC reader board driver. Hardware -------- - 4 relays (outputs) - 4 digital inputs - PN532 NFC reader connected via UEXT1 (UART/HSU mode) UEXT1 pin 3 = GPIO4 → PN532 RXD UEXT1 pin 4 = GPIO5 → PN532 TXD DIP1 = 0, DIP2 = 0 (HSU mode) - HTTP REST API served directly by the board on port 80 API endpoints ------------- POST /relay/on?relay= → {"state": true} POST /relay/off?relay= → {"state": false} POST /relay/toggle?relay= → {"state": } GET /relay/status?relay= → {"state": } GET /input/status?input= → {"state": } POST /register?callback_url= → {"status": "ok"} GET /nfc/status → {"initialized": bool, "card_present": bool, "last_uid": str, "access_state": str, "auth_uid": str, "relay_num": int, "pulse_ms": int} GET /nfc/config → {"auth_uid": str, "relay_num": int, "pulse_ms": int} POST /nfc/config?auth_uid=&relay=&pulse_ms= → {"status": "ok", ...} Webhook (board → server) ------------------------ The board POSTs to the registered callback_url whenever an input changes: POST {"input": , "state": } The board also POSTs an NFC card event on every card detection: POST {"type": "nfc_card", "uid": "", "uptime": } """ from __future__ import annotations import hashlib import hmac import logging import time as _time import urllib.parse import requests from typing import TYPE_CHECKING from app.drivers.base import BoardDriver if TYPE_CHECKING: from app.models.board import Board logger = logging.getLogger(__name__) _TIMEOUT = 3 def _get(url: str, headers: dict | None = None) -> dict | None: try: r = requests.get(url, timeout=_TIMEOUT, headers=headers or {}) r.raise_for_status() return r.json() except Exception as exc: logger.debug("GET %s → %s", url, exc) return None def _post(url: str, headers: dict | None = None) -> dict | None: try: r = requests.post(url, timeout=_TIMEOUT, headers=headers or {}) r.raise_for_status() return r.json() except Exception as exc: logger.debug("POST %s → %s", url, exc) return None def _auth(board: "Board", method: str, url: str) -> dict: """Build HMAC-SHA256 auth headers for a request to the board. Returns an empty dict when no api_secret is configured on the board so that unauthenticated boards continue to work without changes. Message format: "METHOD:path:unix_timestamp" e.g. "GET:/nfc/status:1710512345" """ secret: str = getattr(board, "api_secret", None) or "" if not secret: return {} path = urllib.parse.urlparse(url).path ts = str(int(_time.time())) msg = f"{method}:{path}:{ts}".encode() sig = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest() return {"X-Request-Time": ts, "X-Request-Sig": sig} class OlimexESP32C6EVBPn532Driver(BoardDriver): """Driver for the Olimex ESP32-C6-EVB board with PN532 NFC reader.""" DRIVER_ID = "olimex_esp32_c6_evb_pn532" DISPLAY_NAME = "Olimex ESP32-C6-EVB + PN532 NFC" DESCRIPTION = "4 relays · 4 digital inputs · PN532 NFC reader (UEXT1/HSU) · HTTP REST + webhook callbacks" DEFAULT_NUM_RELAYS = 4 DEFAULT_NUM_INPUTS = 4 FIRMWARE_URL = "https://github.com/OLIMEX/ESP32-C6-EVB" # ── relay control ───────────────────────────────────────────────────────── def get_relay_status(self, board: "Board", relay_num: int) -> bool | None: url = f"{board.base_url}/relay/status?relay={relay_num}" data = _get(url, _auth(board, "GET", url)) return bool(data["state"]) if data is not None else None def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool: action = "on" if state else "off" url = f"{board.base_url}/relay/{action}?relay={relay_num}" return _post(url, _auth(board, "POST", url)) is not None def toggle_relay(self, board: "Board", relay_num: int) -> bool | None: url = f"{board.base_url}/relay/toggle?relay={relay_num}" data = _post(url, _auth(board, "POST", url)) return bool(data["state"]) if data is not None else None # ── poll ────────────────────────────────────────────────────────────────── def poll(self, board: "Board") -> dict: _offline = {"relay_states": {}, "input_states": {}, "is_online": False} relay_states: dict = {} input_states: dict = {} if board.num_relays > 0: url = f"{board.base_url}/relay/status?relay=1" probe = _get(url, _auth(board, "GET", url)) if probe is None: return _offline relay_states["relay_1"] = bool(probe.get("state", False)) elif board.num_inputs > 0: url = f"{board.base_url}/input/status?input=1" probe = _get(url, _auth(board, "GET", url)) if probe is None: return _offline input_states["input_1"] = bool(probe.get("state", False)) else: return _offline for n in range(2, board.num_relays + 1): url = f"{board.base_url}/relay/status?relay={n}" data = _get(url, _auth(board, "GET", url)) if data is not None: relay_states[f"relay_{n}"] = bool(data.get("state", False)) input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1 for n in range(input_start, board.num_inputs + 1): url = f"{board.base_url}/input/status?input={n}" data = _get(url, _auth(board, "GET", url)) if data is not None: input_states[f"input_{n}"] = bool(data.get("state", True)) return { "relay_states": relay_states, "input_states": input_states, "is_online": True, } # ── webhook registration ────────────────────────────────────────────────── def register_webhook(self, board: "Board", callback_url: str) -> bool: url = f"{board.base_url}/register?callback_url={callback_url}" ok = _post(url, _auth(board, "POST", url)) is not None if ok: logger.info("Webhook registered on board '%s' → %s", board.name, callback_url) else: logger.warning("Webhook registration failed for board '%s'", board.name) return ok # ── NFC access control ──────────────────────────────────────────────────── def get_nfc_status(self, board: "Board") -> dict | None: """Return current NFC reader status (last UID, access_state, auth config).""" url = f"{board.base_url}/nfc/status" return _get(url, _auth(board, "GET", url)) def get_nfc_config(self, board: "Board") -> dict | None: """Return current NFC access-control configuration from the board.""" url = f"{board.base_url}/nfc/config" return _get(url, _auth(board, "GET", url)) def set_nfc_config( self, board: "Board", auth_uid: str = "", relay_num: int = 1, pulse_ms: int = 3000, ) -> bool: """Push NFC access-control config to the board. auth_uid: authorized card UID (e.g. "04:AB:CD:EF"); empty = no card authorized. relay_num: which relay to open on a matching card (1-4). pulse_ms: absence timeout — relay closes this many ms after card is removed (100-60000). """ url = ( f"{board.base_url}/nfc/config" f"?auth_uid={urllib.parse.quote(auth_uid.upper())}" f"&relay={relay_num}" f"&pulse_ms={pulse_ms}" ) result = _post(url, _auth(board, "POST", url)) if result: logger.info( "NFC config pushed to board '%s': uid='%s' relay=%d pulse=%dms", board.name, auth_uid, relay_num, pulse_ms, ) else: logger.warning("NFC config push failed for board '%s'", board.name) return result is not None