Files
location_managemet/app/drivers/olimex_esp32_c6_evb/driver.py
scheianu 36de1623c2 Add HMAC-SHA256 API authentication to board drivers and edit UI
- Both olimex_esp32_c6_evb and olimex_esp32_c6_evb_pn532 drivers now
  sign every API request with X-Request-Time / X-Request-Sig headers
  using HMAC-SHA256(api_secret, METHOD:path:unix_timestamp)
- Board model gains api_secret column (nullable, default None)
- boards.py edit route saves api_secret from form
- edit.html adds API Secret input with cryptographic Generate button
- If api_secret is empty/None, headers are omitted (backward compat)
2026-03-15 12:33:45 +02:00

198 lines
7.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Olimex ESP32-C6-EVB board driver.
Hardware
--------
- 4 relays (outputs)
- 4 digital inputs
- HTTP REST API served directly by the board on port 80
API endpoints
-------------
POST /relay/on?relay=<n> → {"state": true}
POST /relay/off?relay=<n> → {"state": false}
POST /relay/toggle?relay=<n> → {"state": <new>}
GET /relay/status?relay=<n> → {"state": <bool>}
GET /input/status?input=<n> → {"state": <bool>}
POST /register?callback_url=<url> → {"status": "ok"}
GET /nfc/status → {"initialized": bool, "card_present": bool,
"last_uid": str, "access_state": str,
"auth_uid": str, "relay_num": int,
"pulse_ms": int}
GET /nfc/config → {"auth_uid": str, "relay_num": int, "pulse_ms": int}
POST /nfc/config?auth_uid=&relay=&pulse_ms= → {"status": "ok", ...}
Webhook (board → server)
------------------------
The board POSTs to the registered callback_url whenever an input changes:
POST <callback_url>
{"input": <n>, "state": <bool>}
"""
from __future__ import annotations
import hashlib
import hmac
import logging
import time as _time
import urllib.parse
import requests
from typing import TYPE_CHECKING
from app.drivers.base import BoardDriver
if TYPE_CHECKING:
from app.models.board import Board
logger = logging.getLogger(__name__)
_TIMEOUT = 3
def _get(url: str, headers: dict | None = None) -> dict | None:
try:
r = requests.get(url, timeout=_TIMEOUT, headers=headers or {})
r.raise_for_status()
return r.json()
except Exception as exc:
logger.debug("GET %s%s", url, exc)
return None
def _post(url: str, headers: dict | None = None) -> dict | None:
try:
r = requests.post(url, timeout=_TIMEOUT, headers=headers or {})
r.raise_for_status()
return r.json()
except Exception as exc:
logger.debug("POST %s%s", url, exc)
return None
def _auth(board: "Board", method: str, url: str) -> dict:
"""Build HMAC-SHA256 auth headers. Returns {} when no api_secret set."""
secret: str = getattr(board, "api_secret", None) or ""
if not secret:
return {}
path = urllib.parse.urlparse(url).path
ts = str(int(_time.time()))
msg = f"{method}:{path}:{ts}".encode()
sig = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest()
return {"X-Request-Time": ts, "X-Request-Sig": sig}
class OlimexESP32C6EVBDriver(BoardDriver):
"""Driver for the Olimex ESP32-C6-EVB board."""
DRIVER_ID = "olimex_esp32_c6_evb"
DISPLAY_NAME = "Olimex ESP32-C6-EVB"
DESCRIPTION = "4 relays · 4 digital inputs · HTTP REST + webhook callbacks"
DEFAULT_NUM_RELAYS = 4
DEFAULT_NUM_INPUTS = 4
FIRMWARE_URL = "https://github.com/OLIMEX/ESP32-C6-EVB"
# ── relay control ─────────────────────────────────────────────────────────
def get_relay_status(self, board: "Board", relay_num: int) -> bool | None:
url = f"{board.base_url}/relay/status?relay={relay_num}"
data = _get(url, _auth(board, "GET", url))
return bool(data["state"]) if data is not None else None
def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool:
action = "on" if state else "off"
url = f"{board.base_url}/relay/{action}?relay={relay_num}"
return _post(url, _auth(board, "POST", url)) is not None
def toggle_relay(self, board: "Board", relay_num: int) -> bool | None:
url = f"{board.base_url}/relay/toggle?relay={relay_num}"
data = _post(url, _auth(board, "POST", url))
return bool(data["state"]) if data is not None else None
# ── poll ──────────────────────────────────────────────────────────────────
def poll(self, board: "Board") -> dict:
_offline = {"relay_states": {}, "input_states": {}, "is_online": False}
# ── Connectivity probe ─────────────────────────────────────────────
# Try the very first endpoint. If the board doesn't reply we know
# it's offline and skip all remaining requests (saving N × 3 s worth
# of connect timeouts on every recheck cycle).
relay_states: dict = {}
input_states: dict = {}
if board.num_relays > 0:
url = f"{board.base_url}/relay/status?relay=1"
probe = _get(url, _auth(board, "GET", url))
if probe is None:
return _offline
relay_states["relay_1"] = bool(probe.get("state", False))
elif board.num_inputs > 0:
url = f"{board.base_url}/input/status?input=1"
probe = _get(url, _auth(board, "GET", url))
if probe is None:
return _offline
input_states["input_1"] = bool(probe.get("state", False))
else:
return _offline
# Board is reachable — collect remaining endpoints
for n in range(2, board.num_relays + 1):
url = f"{board.base_url}/relay/status?relay={n}"
data = _get(url, _auth(board, "GET", url))
if data is not None:
relay_states[f"relay_{n}"] = bool(data.get("state", False))
input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1
for n in range(input_start, board.num_inputs + 1):
url = f"{board.base_url}/input/status?input={n}"
data = _get(url, _auth(board, "GET", url))
if data is not None:
input_states[f"input_{n}"] = bool(data.get("state", True))
return {
"relay_states": relay_states,
"input_states": input_states,
"is_online": True,
}
# ── webhook registration ──────────────────────────────────────────────────
def register_webhook(self, board: "Board", callback_url: str) -> bool:
url = f"{board.base_url}/register?callback_url={callback_url}"
ok = _post(url, _auth(board, "POST", url)) is not None
if ok:
logger.info("Webhook registered on board '%s' \u2192 %s", board.name, callback_url)
else:
logger.warning("Webhook registration failed for board '%s'", board.name)
return ok
# ── NFC access control ──────────────────────────────────────────────────────
def get_nfc_status(self, board: "Board") -> dict | None:
"""Return current NFC reader status (last UID, access_state, auth config)."""
url = f"{board.base_url}/nfc/status"
return _get(url, _auth(board, "GET", url))
def set_nfc_config(
self,
board: "Board",
auth_uid: str = "",
relay_num: int = 1,
pulse_ms: int = 3000,
) -> bool:
"""Push NFC access-control config to the board."""
import urllib.parse
url = (
f"{board.base_url}/nfc/config"
f"?auth_uid={urllib.parse.quote(auth_uid.upper())}"
f"&relay={relay_num}"
f"&pulse_ms={pulse_ms}"
)
result = _post(url, _auth(board, "POST", url))
if result:
logger.info(
"NFC config pushed to board '%s': uid='%s' relay=%d pulse=%dms",
board.name, auth_uid, relay_num, pulse_ms,
)
else:
logger.warning("NFC config push failed for board '%s'", board.name)
return result is not None