diff --git a/app/drivers/olimex_esp32_c6_evb/driver.py b/app/drivers/olimex_esp32_c6_evb/driver.py index 1351472..5879167 100644 --- a/app/drivers/olimex_esp32_c6_evb/driver.py +++ b/app/drivers/olimex_esp32_c6_evb/driver.py @@ -29,7 +29,11 @@ The board POSTs to the registered callback_url whenever an input changes: """ from __future__ import annotations +import hashlib +import hmac import logging +import time as _time +import urllib.parse import requests from typing import TYPE_CHECKING @@ -42,9 +46,9 @@ logger = logging.getLogger(__name__) _TIMEOUT = 3 -def _get(url: str) -> dict | None: +def _get(url: str, headers: dict | None = None) -> dict | None: try: - r = requests.get(url, timeout=_TIMEOUT) + r = requests.get(url, timeout=_TIMEOUT, headers=headers or {}) r.raise_for_status() return r.json() except Exception as exc: @@ -52,9 +56,9 @@ def _get(url: str) -> dict | None: return None -def _post(url: str) -> dict | None: +def _post(url: str, headers: dict | None = None) -> dict | None: try: - r = requests.post(url, timeout=_TIMEOUT) + r = requests.post(url, timeout=_TIMEOUT, headers=headers or {}) r.raise_for_status() return r.json() except Exception as exc: @@ -62,6 +66,18 @@ def _post(url: str) -> dict | None: return None +def _auth(board: "Board", method: str, url: str) -> dict: + """Build HMAC-SHA256 auth headers. Returns {} when no api_secret set.""" + secret: str = getattr(board, "api_secret", None) or "" + if not secret: + return {} + path = urllib.parse.urlparse(url).path + ts = str(int(_time.time())) + msg = f"{method}:{path}:{ts}".encode() + sig = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest() + return {"X-Request-Time": ts, "X-Request-Sig": sig} + + class OlimexESP32C6EVBDriver(BoardDriver): """Driver for the Olimex ESP32-C6-EVB board.""" @@ -75,15 +91,18 @@ class OlimexESP32C6EVBDriver(BoardDriver): # ── relay control ───────────────────────────────────────────────────────── def get_relay_status(self, board: "Board", relay_num: int) -> bool | None: - data = _get(f"{board.base_url}/relay/status?relay={relay_num}") + url = f"{board.base_url}/relay/status?relay={relay_num}" + data = _get(url, _auth(board, "GET", url)) return bool(data["state"]) if data is not None else None def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool: action = "on" if state else "off" - return _post(f"{board.base_url}/relay/{action}?relay={relay_num}") is not None + url = f"{board.base_url}/relay/{action}?relay={relay_num}" + return _post(url, _auth(board, "POST", url)) is not None def toggle_relay(self, board: "Board", relay_num: int) -> bool | None: - data = _post(f"{board.base_url}/relay/toggle?relay={relay_num}") + url = f"{board.base_url}/relay/toggle?relay={relay_num}" + data = _post(url, _auth(board, "POST", url)) return bool(data["state"]) if data is not None else None # ── poll ────────────────────────────────────────────────────────────────── @@ -99,12 +118,14 @@ class OlimexESP32C6EVBDriver(BoardDriver): input_states: dict = {} if board.num_relays > 0: - probe = _get(f"{board.base_url}/relay/status?relay=1") + url = f"{board.base_url}/relay/status?relay=1" + probe = _get(url, _auth(board, "GET", url)) if probe is None: return _offline relay_states["relay_1"] = bool(probe.get("state", False)) elif board.num_inputs > 0: - probe = _get(f"{board.base_url}/input/status?input=1") + url = f"{board.base_url}/input/status?input=1" + probe = _get(url, _auth(board, "GET", url)) if probe is None: return _offline input_states["input_1"] = bool(probe.get("state", False)) @@ -113,13 +134,15 @@ class OlimexESP32C6EVBDriver(BoardDriver): # Board is reachable — collect remaining endpoints for n in range(2, board.num_relays + 1): - data = _get(f"{board.base_url}/relay/status?relay={n}") + url = f"{board.base_url}/relay/status?relay={n}" + data = _get(url, _auth(board, "GET", url)) if data is not None: relay_states[f"relay_{n}"] = bool(data.get("state", False)) input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1 for n in range(input_start, board.num_inputs + 1): - data = _get(f"{board.base_url}/input/status?input={n}") + url = f"{board.base_url}/input/status?input={n}" + data = _get(url, _auth(board, "GET", url)) if data is not None: input_states[f"input_{n}"] = bool(data.get("state", True)) @@ -133,7 +156,7 @@ class OlimexESP32C6EVBDriver(BoardDriver): def register_webhook(self, board: "Board", callback_url: str) -> bool: url = f"{board.base_url}/register?callback_url={callback_url}" - ok = _post(url) is not None + ok = _post(url, _auth(board, "POST", url)) is not None if ok: logger.info("Webhook registered on board '%s' \u2192 %s", board.name, callback_url) else: @@ -144,7 +167,8 @@ class OlimexESP32C6EVBDriver(BoardDriver): def get_nfc_status(self, board: "Board") -> dict | None: """Return current NFC reader status (last UID, access_state, auth config).""" - return _get(f"{board.base_url}/nfc/status") + url = f"{board.base_url}/nfc/status" + return _get(url, _auth(board, "GET", url)) def set_nfc_config( self, @@ -153,12 +177,7 @@ class OlimexESP32C6EVBDriver(BoardDriver): relay_num: int = 1, pulse_ms: int = 3000, ) -> bool: - """Push NFC access-control config to the board. - - auth_uid: authorized card UID (e.g. "04:AB:CD:EF"); empty = any card opens relay. - relay_num: which relay to open on a matching card (1-4). - pulse_ms: how long to hold the relay open in milliseconds (100-60000). - """ + """Push NFC access-control config to the board.""" import urllib.parse url = ( @@ -167,7 +186,7 @@ class OlimexESP32C6EVBDriver(BoardDriver): f"&relay={relay_num}" f"&pulse_ms={pulse_ms}" ) - result = _post(url) + result = _post(url, _auth(board, "POST", url)) if result: logger.info( "NFC config pushed to board '%s': uid='%s' relay=%d pulse=%dms", diff --git a/app/drivers/olimex_esp32_c6_evb_pn532/driver.py b/app/drivers/olimex_esp32_c6_evb_pn532/driver.py index 88923b5..1e1fa6f 100644 --- a/app/drivers/olimex_esp32_c6_evb_pn532/driver.py +++ b/app/drivers/olimex_esp32_c6_evb_pn532/driver.py @@ -37,7 +37,10 @@ The board also POSTs an NFC card event on every card detection: """ from __future__ import annotations +import hashlib +import hmac import logging +import time as _time import urllib.parse import requests from typing import TYPE_CHECKING @@ -51,9 +54,9 @@ logger = logging.getLogger(__name__) _TIMEOUT = 3 -def _get(url: str) -> dict | None: +def _get(url: str, headers: dict | None = None) -> dict | None: try: - r = requests.get(url, timeout=_TIMEOUT) + r = requests.get(url, timeout=_TIMEOUT, headers=headers or {}) r.raise_for_status() return r.json() except Exception as exc: @@ -61,9 +64,9 @@ def _get(url: str) -> dict | None: return None -def _post(url: str) -> dict | None: +def _post(url: str, headers: dict | None = None) -> dict | None: try: - r = requests.post(url, timeout=_TIMEOUT) + r = requests.post(url, timeout=_TIMEOUT, headers=headers or {}) r.raise_for_status() return r.json() except Exception as exc: @@ -71,6 +74,25 @@ def _post(url: str) -> dict | None: return None +def _auth(board: "Board", method: str, url: str) -> dict: + """Build HMAC-SHA256 auth headers for a request to the board. + + Returns an empty dict when no api_secret is configured on the board + so that unauthenticated boards continue to work without changes. + + Message format: "METHOD:path:unix_timestamp" + e.g. "GET:/nfc/status:1710512345" + """ + secret: str = getattr(board, "api_secret", None) or "" + if not secret: + return {} + path = urllib.parse.urlparse(url).path + ts = str(int(_time.time())) + msg = f"{method}:{path}:{ts}".encode() + sig = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest() + return {"X-Request-Time": ts, "X-Request-Sig": sig} + + class OlimexESP32C6EVBPn532Driver(BoardDriver): """Driver for the Olimex ESP32-C6-EVB board with PN532 NFC reader.""" @@ -84,15 +106,18 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver): # ── relay control ───────────────────────────────────────────────────────── def get_relay_status(self, board: "Board", relay_num: int) -> bool | None: - data = _get(f"{board.base_url}/relay/status?relay={relay_num}") + url = f"{board.base_url}/relay/status?relay={relay_num}" + data = _get(url, _auth(board, "GET", url)) return bool(data["state"]) if data is not None else None def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool: action = "on" if state else "off" - return _post(f"{board.base_url}/relay/{action}?relay={relay_num}") is not None + url = f"{board.base_url}/relay/{action}?relay={relay_num}" + return _post(url, _auth(board, "POST", url)) is not None def toggle_relay(self, board: "Board", relay_num: int) -> bool | None: - data = _post(f"{board.base_url}/relay/toggle?relay={relay_num}") + url = f"{board.base_url}/relay/toggle?relay={relay_num}" + data = _post(url, _auth(board, "POST", url)) return bool(data["state"]) if data is not None else None # ── poll ────────────────────────────────────────────────────────────────── @@ -104,12 +129,14 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver): input_states: dict = {} if board.num_relays > 0: - probe = _get(f"{board.base_url}/relay/status?relay=1") + url = f"{board.base_url}/relay/status?relay=1" + probe = _get(url, _auth(board, "GET", url)) if probe is None: return _offline relay_states["relay_1"] = bool(probe.get("state", False)) elif board.num_inputs > 0: - probe = _get(f"{board.base_url}/input/status?input=1") + url = f"{board.base_url}/input/status?input=1" + probe = _get(url, _auth(board, "GET", url)) if probe is None: return _offline input_states["input_1"] = bool(probe.get("state", False)) @@ -117,13 +144,15 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver): return _offline for n in range(2, board.num_relays + 1): - data = _get(f"{board.base_url}/relay/status?relay={n}") + url = f"{board.base_url}/relay/status?relay={n}" + data = _get(url, _auth(board, "GET", url)) if data is not None: relay_states[f"relay_{n}"] = bool(data.get("state", False)) input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1 for n in range(input_start, board.num_inputs + 1): - data = _get(f"{board.base_url}/input/status?input={n}") + url = f"{board.base_url}/input/status?input={n}" + data = _get(url, _auth(board, "GET", url)) if data is not None: input_states[f"input_{n}"] = bool(data.get("state", True)) @@ -137,7 +166,7 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver): def register_webhook(self, board: "Board", callback_url: str) -> bool: url = f"{board.base_url}/register?callback_url={callback_url}" - ok = _post(url) is not None + ok = _post(url, _auth(board, "POST", url)) is not None if ok: logger.info("Webhook registered on board '%s' → %s", board.name, callback_url) else: @@ -148,11 +177,13 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver): def get_nfc_status(self, board: "Board") -> dict | None: """Return current NFC reader status (last UID, access_state, auth config).""" - return _get(f"{board.base_url}/nfc/status") + url = f"{board.base_url}/nfc/status" + return _get(url, _auth(board, "GET", url)) def get_nfc_config(self, board: "Board") -> dict | None: """Return current NFC access-control configuration from the board.""" - return _get(f"{board.base_url}/nfc/config") + url = f"{board.base_url}/nfc/config" + return _get(url, _auth(board, "GET", url)) def set_nfc_config( self, @@ -173,7 +204,7 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver): f"&relay={relay_num}" f"&pulse_ms={pulse_ms}" ) - result = _post(url) + result = _post(url, _auth(board, "POST", url)) if result: logger.info( "NFC config pushed to board '%s': uid='%s' relay=%d pulse=%dms", diff --git a/app/models/board.py b/app/models/board.py index 1d3cae1..13528f3 100644 --- a/app/models/board.py +++ b/app/models/board.py @@ -43,6 +43,11 @@ class Board(db.Model): # Extra driver-specific config (JSON) – e.g. eWeLink credentials/token config_json = db.Column(db.Text, default="{}") + # HMAC-SHA256 shared secret for API request authentication. + # Must match API_SECRET defined in the board firmware's secrets.h. + # Leave None/empty to disable authentication (open access). + api_secret = db.Column(db.String(128), nullable=True, default=None) + # ── relationships ──────────────────────────────────────────────── workflows_trigger = db.relationship( "Workflow", foreign_keys="Workflow.trigger_board_id", diff --git a/app/routes/boards.py b/app/routes/boards.py index 6737a03..0259fe7 100644 --- a/app/routes/boards.py +++ b/app/routes/boards.py @@ -129,6 +129,7 @@ def edit_board(board_id: int): if lbl: labels[f"input_{n}"] = lbl board.labels = labels + board.api_secret = request.form.get("api_secret", "").strip() or None db.session.commit() flash("Board updated.", "success") diff --git a/app/templates/boards/edit.html b/app/templates/boards/edit.html index d350478..e32d4c9 100644 --- a/app/templates/boards/edit.html +++ b/app/templates/boards/edit.html @@ -76,6 +76,21 @@ {% endfor %} + +
API_SECRET in the board's secrets.h.