Add HMAC-SHA256 API authentication to board drivers and edit UI
- Both olimex_esp32_c6_evb and olimex_esp32_c6_evb_pn532 drivers now sign every API request with X-Request-Time / X-Request-Sig headers using HMAC-SHA256(api_secret, METHOD:path:unix_timestamp) - Board model gains api_secret column (nullable, default None) - boards.py edit route saves api_secret from form - edit.html adds API Secret input with cryptographic Generate button - If api_secret is empty/None, headers are omitted (backward compat)
This commit is contained in:
@@ -29,7 +29,11 @@ The board POSTs to the registered callback_url whenever an input changes:
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import hmac
|
||||
import logging
|
||||
import time as _time
|
||||
import urllib.parse
|
||||
import requests
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
@@ -42,9 +46,9 @@ logger = logging.getLogger(__name__)
|
||||
_TIMEOUT = 3
|
||||
|
||||
|
||||
def _get(url: str) -> dict | None:
|
||||
def _get(url: str, headers: dict | None = None) -> dict | None:
|
||||
try:
|
||||
r = requests.get(url, timeout=_TIMEOUT)
|
||||
r = requests.get(url, timeout=_TIMEOUT, headers=headers or {})
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
except Exception as exc:
|
||||
@@ -52,9 +56,9 @@ def _get(url: str) -> dict | None:
|
||||
return None
|
||||
|
||||
|
||||
def _post(url: str) -> dict | None:
|
||||
def _post(url: str, headers: dict | None = None) -> dict | None:
|
||||
try:
|
||||
r = requests.post(url, timeout=_TIMEOUT)
|
||||
r = requests.post(url, timeout=_TIMEOUT, headers=headers or {})
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
except Exception as exc:
|
||||
@@ -62,6 +66,18 @@ def _post(url: str) -> dict | None:
|
||||
return None
|
||||
|
||||
|
||||
def _auth(board: "Board", method: str, url: str) -> dict:
|
||||
"""Build HMAC-SHA256 auth headers. Returns {} when no api_secret set."""
|
||||
secret: str = getattr(board, "api_secret", None) or ""
|
||||
if not secret:
|
||||
return {}
|
||||
path = urllib.parse.urlparse(url).path
|
||||
ts = str(int(_time.time()))
|
||||
msg = f"{method}:{path}:{ts}".encode()
|
||||
sig = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest()
|
||||
return {"X-Request-Time": ts, "X-Request-Sig": sig}
|
||||
|
||||
|
||||
class OlimexESP32C6EVBDriver(BoardDriver):
|
||||
"""Driver for the Olimex ESP32-C6-EVB board."""
|
||||
|
||||
@@ -75,15 +91,18 @@ class OlimexESP32C6EVBDriver(BoardDriver):
|
||||
# ── relay control ─────────────────────────────────────────────────────────
|
||||
|
||||
def get_relay_status(self, board: "Board", relay_num: int) -> bool | None:
|
||||
data = _get(f"{board.base_url}/relay/status?relay={relay_num}")
|
||||
url = f"{board.base_url}/relay/status?relay={relay_num}"
|
||||
data = _get(url, _auth(board, "GET", url))
|
||||
return bool(data["state"]) if data is not None else None
|
||||
|
||||
def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool:
|
||||
action = "on" if state else "off"
|
||||
return _post(f"{board.base_url}/relay/{action}?relay={relay_num}") is not None
|
||||
url = f"{board.base_url}/relay/{action}?relay={relay_num}"
|
||||
return _post(url, _auth(board, "POST", url)) is not None
|
||||
|
||||
def toggle_relay(self, board: "Board", relay_num: int) -> bool | None:
|
||||
data = _post(f"{board.base_url}/relay/toggle?relay={relay_num}")
|
||||
url = f"{board.base_url}/relay/toggle?relay={relay_num}"
|
||||
data = _post(url, _auth(board, "POST", url))
|
||||
return bool(data["state"]) if data is not None else None
|
||||
|
||||
# ── poll ──────────────────────────────────────────────────────────────────
|
||||
@@ -99,12 +118,14 @@ class OlimexESP32C6EVBDriver(BoardDriver):
|
||||
input_states: dict = {}
|
||||
|
||||
if board.num_relays > 0:
|
||||
probe = _get(f"{board.base_url}/relay/status?relay=1")
|
||||
url = f"{board.base_url}/relay/status?relay=1"
|
||||
probe = _get(url, _auth(board, "GET", url))
|
||||
if probe is None:
|
||||
return _offline
|
||||
relay_states["relay_1"] = bool(probe.get("state", False))
|
||||
elif board.num_inputs > 0:
|
||||
probe = _get(f"{board.base_url}/input/status?input=1")
|
||||
url = f"{board.base_url}/input/status?input=1"
|
||||
probe = _get(url, _auth(board, "GET", url))
|
||||
if probe is None:
|
||||
return _offline
|
||||
input_states["input_1"] = bool(probe.get("state", False))
|
||||
@@ -113,13 +134,15 @@ class OlimexESP32C6EVBDriver(BoardDriver):
|
||||
|
||||
# Board is reachable — collect remaining endpoints
|
||||
for n in range(2, board.num_relays + 1):
|
||||
data = _get(f"{board.base_url}/relay/status?relay={n}")
|
||||
url = f"{board.base_url}/relay/status?relay={n}"
|
||||
data = _get(url, _auth(board, "GET", url))
|
||||
if data is not None:
|
||||
relay_states[f"relay_{n}"] = bool(data.get("state", False))
|
||||
|
||||
input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1
|
||||
for n in range(input_start, board.num_inputs + 1):
|
||||
data = _get(f"{board.base_url}/input/status?input={n}")
|
||||
url = f"{board.base_url}/input/status?input={n}"
|
||||
data = _get(url, _auth(board, "GET", url))
|
||||
if data is not None:
|
||||
input_states[f"input_{n}"] = bool(data.get("state", True))
|
||||
|
||||
@@ -133,7 +156,7 @@ class OlimexESP32C6EVBDriver(BoardDriver):
|
||||
|
||||
def register_webhook(self, board: "Board", callback_url: str) -> bool:
|
||||
url = f"{board.base_url}/register?callback_url={callback_url}"
|
||||
ok = _post(url) is not None
|
||||
ok = _post(url, _auth(board, "POST", url)) is not None
|
||||
if ok:
|
||||
logger.info("Webhook registered on board '%s' \u2192 %s", board.name, callback_url)
|
||||
else:
|
||||
@@ -144,7 +167,8 @@ class OlimexESP32C6EVBDriver(BoardDriver):
|
||||
|
||||
def get_nfc_status(self, board: "Board") -> dict | None:
|
||||
"""Return current NFC reader status (last UID, access_state, auth config)."""
|
||||
return _get(f"{board.base_url}/nfc/status")
|
||||
url = f"{board.base_url}/nfc/status"
|
||||
return _get(url, _auth(board, "GET", url))
|
||||
|
||||
def set_nfc_config(
|
||||
self,
|
||||
@@ -153,12 +177,7 @@ class OlimexESP32C6EVBDriver(BoardDriver):
|
||||
relay_num: int = 1,
|
||||
pulse_ms: int = 3000,
|
||||
) -> bool:
|
||||
"""Push NFC access-control config to the board.
|
||||
|
||||
auth_uid: authorized card UID (e.g. "04:AB:CD:EF"); empty = any card opens relay.
|
||||
relay_num: which relay to open on a matching card (1-4).
|
||||
pulse_ms: how long to hold the relay open in milliseconds (100-60000).
|
||||
"""
|
||||
"""Push NFC access-control config to the board."""
|
||||
import urllib.parse
|
||||
|
||||
url = (
|
||||
@@ -167,7 +186,7 @@ class OlimexESP32C6EVBDriver(BoardDriver):
|
||||
f"&relay={relay_num}"
|
||||
f"&pulse_ms={pulse_ms}"
|
||||
)
|
||||
result = _post(url)
|
||||
result = _post(url, _auth(board, "POST", url))
|
||||
if result:
|
||||
logger.info(
|
||||
"NFC config pushed to board '%s': uid='%s' relay=%d pulse=%dms",
|
||||
|
||||
@@ -37,7 +37,10 @@ The board also POSTs an NFC card event on every card detection:
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import hmac
|
||||
import logging
|
||||
import time as _time
|
||||
import urllib.parse
|
||||
import requests
|
||||
from typing import TYPE_CHECKING
|
||||
@@ -51,9 +54,9 @@ logger = logging.getLogger(__name__)
|
||||
_TIMEOUT = 3
|
||||
|
||||
|
||||
def _get(url: str) -> dict | None:
|
||||
def _get(url: str, headers: dict | None = None) -> dict | None:
|
||||
try:
|
||||
r = requests.get(url, timeout=_TIMEOUT)
|
||||
r = requests.get(url, timeout=_TIMEOUT, headers=headers or {})
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
except Exception as exc:
|
||||
@@ -61,9 +64,9 @@ def _get(url: str) -> dict | None:
|
||||
return None
|
||||
|
||||
|
||||
def _post(url: str) -> dict | None:
|
||||
def _post(url: str, headers: dict | None = None) -> dict | None:
|
||||
try:
|
||||
r = requests.post(url, timeout=_TIMEOUT)
|
||||
r = requests.post(url, timeout=_TIMEOUT, headers=headers or {})
|
||||
r.raise_for_status()
|
||||
return r.json()
|
||||
except Exception as exc:
|
||||
@@ -71,6 +74,25 @@ def _post(url: str) -> dict | None:
|
||||
return None
|
||||
|
||||
|
||||
def _auth(board: "Board", method: str, url: str) -> dict:
|
||||
"""Build HMAC-SHA256 auth headers for a request to the board.
|
||||
|
||||
Returns an empty dict when no api_secret is configured on the board
|
||||
so that unauthenticated boards continue to work without changes.
|
||||
|
||||
Message format: "METHOD:path:unix_timestamp"
|
||||
e.g. "GET:/nfc/status:1710512345"
|
||||
"""
|
||||
secret: str = getattr(board, "api_secret", None) or ""
|
||||
if not secret:
|
||||
return {}
|
||||
path = urllib.parse.urlparse(url).path
|
||||
ts = str(int(_time.time()))
|
||||
msg = f"{method}:{path}:{ts}".encode()
|
||||
sig = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest()
|
||||
return {"X-Request-Time": ts, "X-Request-Sig": sig}
|
||||
|
||||
|
||||
class OlimexESP32C6EVBPn532Driver(BoardDriver):
|
||||
"""Driver for the Olimex ESP32-C6-EVB board with PN532 NFC reader."""
|
||||
|
||||
@@ -84,15 +106,18 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver):
|
||||
# ── relay control ─────────────────────────────────────────────────────────
|
||||
|
||||
def get_relay_status(self, board: "Board", relay_num: int) -> bool | None:
|
||||
data = _get(f"{board.base_url}/relay/status?relay={relay_num}")
|
||||
url = f"{board.base_url}/relay/status?relay={relay_num}"
|
||||
data = _get(url, _auth(board, "GET", url))
|
||||
return bool(data["state"]) if data is not None else None
|
||||
|
||||
def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool:
|
||||
action = "on" if state else "off"
|
||||
return _post(f"{board.base_url}/relay/{action}?relay={relay_num}") is not None
|
||||
url = f"{board.base_url}/relay/{action}?relay={relay_num}"
|
||||
return _post(url, _auth(board, "POST", url)) is not None
|
||||
|
||||
def toggle_relay(self, board: "Board", relay_num: int) -> bool | None:
|
||||
data = _post(f"{board.base_url}/relay/toggle?relay={relay_num}")
|
||||
url = f"{board.base_url}/relay/toggle?relay={relay_num}"
|
||||
data = _post(url, _auth(board, "POST", url))
|
||||
return bool(data["state"]) if data is not None else None
|
||||
|
||||
# ── poll ──────────────────────────────────────────────────────────────────
|
||||
@@ -104,12 +129,14 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver):
|
||||
input_states: dict = {}
|
||||
|
||||
if board.num_relays > 0:
|
||||
probe = _get(f"{board.base_url}/relay/status?relay=1")
|
||||
url = f"{board.base_url}/relay/status?relay=1"
|
||||
probe = _get(url, _auth(board, "GET", url))
|
||||
if probe is None:
|
||||
return _offline
|
||||
relay_states["relay_1"] = bool(probe.get("state", False))
|
||||
elif board.num_inputs > 0:
|
||||
probe = _get(f"{board.base_url}/input/status?input=1")
|
||||
url = f"{board.base_url}/input/status?input=1"
|
||||
probe = _get(url, _auth(board, "GET", url))
|
||||
if probe is None:
|
||||
return _offline
|
||||
input_states["input_1"] = bool(probe.get("state", False))
|
||||
@@ -117,13 +144,15 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver):
|
||||
return _offline
|
||||
|
||||
for n in range(2, board.num_relays + 1):
|
||||
data = _get(f"{board.base_url}/relay/status?relay={n}")
|
||||
url = f"{board.base_url}/relay/status?relay={n}"
|
||||
data = _get(url, _auth(board, "GET", url))
|
||||
if data is not None:
|
||||
relay_states[f"relay_{n}"] = bool(data.get("state", False))
|
||||
|
||||
input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1
|
||||
for n in range(input_start, board.num_inputs + 1):
|
||||
data = _get(f"{board.base_url}/input/status?input={n}")
|
||||
url = f"{board.base_url}/input/status?input={n}"
|
||||
data = _get(url, _auth(board, "GET", url))
|
||||
if data is not None:
|
||||
input_states[f"input_{n}"] = bool(data.get("state", True))
|
||||
|
||||
@@ -137,7 +166,7 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver):
|
||||
|
||||
def register_webhook(self, board: "Board", callback_url: str) -> bool:
|
||||
url = f"{board.base_url}/register?callback_url={callback_url}"
|
||||
ok = _post(url) is not None
|
||||
ok = _post(url, _auth(board, "POST", url)) is not None
|
||||
if ok:
|
||||
logger.info("Webhook registered on board '%s' → %s", board.name, callback_url)
|
||||
else:
|
||||
@@ -148,11 +177,13 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver):
|
||||
|
||||
def get_nfc_status(self, board: "Board") -> dict | None:
|
||||
"""Return current NFC reader status (last UID, access_state, auth config)."""
|
||||
return _get(f"{board.base_url}/nfc/status")
|
||||
url = f"{board.base_url}/nfc/status"
|
||||
return _get(url, _auth(board, "GET", url))
|
||||
|
||||
def get_nfc_config(self, board: "Board") -> dict | None:
|
||||
"""Return current NFC access-control configuration from the board."""
|
||||
return _get(f"{board.base_url}/nfc/config")
|
||||
url = f"{board.base_url}/nfc/config"
|
||||
return _get(url, _auth(board, "GET", url))
|
||||
|
||||
def set_nfc_config(
|
||||
self,
|
||||
@@ -173,7 +204,7 @@ class OlimexESP32C6EVBPn532Driver(BoardDriver):
|
||||
f"&relay={relay_num}"
|
||||
f"&pulse_ms={pulse_ms}"
|
||||
)
|
||||
result = _post(url)
|
||||
result = _post(url, _auth(board, "POST", url))
|
||||
if result:
|
||||
logger.info(
|
||||
"NFC config pushed to board '%s': uid='%s' relay=%d pulse=%dms",
|
||||
|
||||
@@ -43,6 +43,11 @@ class Board(db.Model):
|
||||
# Extra driver-specific config (JSON) – e.g. eWeLink credentials/token
|
||||
config_json = db.Column(db.Text, default="{}")
|
||||
|
||||
# HMAC-SHA256 shared secret for API request authentication.
|
||||
# Must match API_SECRET defined in the board firmware's secrets.h.
|
||||
# Leave None/empty to disable authentication (open access).
|
||||
api_secret = db.Column(db.String(128), nullable=True, default=None)
|
||||
|
||||
# ── relationships ────────────────────────────────────────────────
|
||||
workflows_trigger = db.relationship(
|
||||
"Workflow", foreign_keys="Workflow.trigger_board_id",
|
||||
|
||||
@@ -129,6 +129,7 @@ def edit_board(board_id: int):
|
||||
if lbl:
|
||||
labels[f"input_{n}"] = lbl
|
||||
board.labels = labels
|
||||
board.api_secret = request.form.get("api_secret", "").strip() or None
|
||||
|
||||
db.session.commit()
|
||||
flash("Board updated.", "success")
|
||||
|
||||
@@ -76,6 +76,21 @@
|
||||
{% endfor %}
|
||||
</div>
|
||||
|
||||
<!-- API Security -->
|
||||
<h6 class="text-secondary mb-3 text-uppercase small fw-semibold">API Security</h6>
|
||||
<div class="mb-4">
|
||||
<label class="form-label">API Secret <span class="text-secondary small">(HMAC-SHA256 shared secret)</span></label>
|
||||
<div class="input-group">
|
||||
<input type="text" name="api_secret" id="api-secret-input" class="form-control font-monospace"
|
||||
placeholder="Leave empty to disable API authentication"
|
||||
value="{{ board.api_secret or '' }}">
|
||||
<button class="btn btn-outline-secondary" type="button" onclick="genSecret()">
|
||||
<i class="bi bi-shuffle me-1"></i>Generate
|
||||
</button>
|
||||
</div>
|
||||
<div class="form-text">Must match <code>API_SECRET</code> in the board's <code>secrets.h</code>.</div>
|
||||
</div>
|
||||
|
||||
<div class="d-flex gap-2">
|
||||
<button type="submit" class="btn btn-primary"><i class="bi bi-check-lg me-1"></i> Save</button>
|
||||
<a href="{{ url_for('boards.board_detail', board_id=board.id) }}" class="btn btn-outline-secondary">Cancel</a>
|
||||
@@ -84,3 +99,14 @@
|
||||
</div>
|
||||
</div>
|
||||
{% endblock %}
|
||||
|
||||
{% block scripts %}
|
||||
<script>
|
||||
function genSecret() {
|
||||
const buf = new Uint8Array(32);
|
||||
crypto.getRandomValues(buf);
|
||||
document.getElementById('api-secret-input').value =
|
||||
Array.from(buf).map(b => b.toString(16).padStart(2, '0')).join('');
|
||||
}
|
||||
</script>
|
||||
{% endblock %}
|
||||
|
||||
Reference in New Issue
Block a user