From 916e35b22b1de388a64ab717c7b5470beb0f2554 Mon Sep 17 00:00:00 2001 From: scheianu Date: Thu, 11 Jun 2026 00:44:19 +0300 Subject: [PATCH] updated board from olimex c5 --- app/drivers/olimex_esp32_c5_evb/__init__.py | 0 app/drivers/olimex_esp32_c5_evb/driver.py | 211 ++++++++++++++++++ app/drivers/olimex_esp32_c5_evb/manifest.json | 18 ++ app/routes/boards.py | 33 +-- app/templates/boards/detail.html | 2 +- app/templates/boards/nfc.html | 20 +- 6 files changed, 257 insertions(+), 27 deletions(-) create mode 100644 app/drivers/olimex_esp32_c5_evb/__init__.py create mode 100644 app/drivers/olimex_esp32_c5_evb/driver.py create mode 100644 app/drivers/olimex_esp32_c5_evb/manifest.json diff --git a/app/drivers/olimex_esp32_c5_evb/__init__.py b/app/drivers/olimex_esp32_c5_evb/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/drivers/olimex_esp32_c5_evb/driver.py b/app/drivers/olimex_esp32_c5_evb/driver.py new file mode 100644 index 0000000..76e6c01 --- /dev/null +++ b/app/drivers/olimex_esp32_c5_evb/driver.py @@ -0,0 +1,211 @@ +"""Olimex ESP32-C5-EVB board driver. + +Hardware +-------- +- 2 relays (outputs, 250V/10A) +- 2 opto-isolated inputs (110VAC-240VAC) +- HTTP REST API served directly by the board on port 80 + +API endpoints +------------- +POST /relay/on?relay= → {"state": true} +POST /relay/off?relay= → {"state": false} +POST /relay/toggle?relay= → {"state": } +GET /relay/status?relay= → {"state": } +GET /input/status?input= → {"state": } +POST /register?callback_url= → {"status": "ok"} +GET /nfc/status → {"initialized": bool, "card_present": bool, + "last_uid": str, "access_state": str, + "auth_uid": str, "relay_num": int, + "pulse_ms": int} +GET /nfc/config → {"auth_uid": str, "relay_num": int, "pulse_ms": int} +POST /nfc/config?auth_uid=&relay=&pulse_ms= → {"status": "ok", ...} + +Webhook (board → server) +------------------------ +The board POSTs to the registered callback_url whenever an input changes: + POST + {"input": , "state": } +""" +from __future__ import annotations + +import hashlib +import hmac +import logging +import time as _time +import urllib.parse +import requests +from typing import TYPE_CHECKING + +from app.drivers.base import BoardDriver + +if TYPE_CHECKING: + from app.models.board import Board + +logger = logging.getLogger(__name__) +_TIMEOUT = 3 + + +def _get(url: str, headers: dict | None = None) -> dict | None: + try: + r = requests.get(url, timeout=_TIMEOUT, headers=headers or {}) + r.raise_for_status() + return r.json() + except Exception as exc: + logger.debug("GET %s → %s", url, exc) + return None + + +def _post(url: str, headers: dict | None = None) -> dict | None: + try: + r = requests.post(url, timeout=_TIMEOUT, headers=headers or {}) + r.raise_for_status() + return r.json() + except Exception as exc: + logger.debug("POST %s → %s", url, exc) + return None + + +def _auth(board: "Board", method: str, url: str) -> dict: + """Build HMAC-SHA256 auth headers. Returns {} when no api_secret set.""" + secret: str = getattr(board, "api_secret", None) or "" + if not secret: + return {} + path = urllib.parse.urlparse(url).path + ts = str(int(_time.time())) + msg = f"{method}:{path}:{ts}".encode() + sig = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest() + return {"X-Request-Time": ts, "X-Request-Sig": sig} + + +class OlimexESP32C5EVBDriver(BoardDriver): + """Driver for the Olimex ESP32-C5-EVB board.""" + + DRIVER_ID = "olimex_esp32_c5_evb" + DISPLAY_NAME = "Olimex ESP32-C5-EVB" + DESCRIPTION = "2 relays (250V/10A) · 2 opto-isolated inputs (110-240VAC) · HTTP REST + webhook callbacks" + DEFAULT_NUM_RELAYS = 2 + DEFAULT_NUM_INPUTS = 2 + FIRMWARE_URL = "https://github.com/OLIMEX/ESP32-C5-EVB" + + # ── relay control ───────────────────────────────────────────────────────── + + def get_relay_status(self, board: "Board", relay_num: int) -> bool | None: + url = f"{board.base_url}/relay/status?relay={relay_num}" + data = _get(url, _auth(board, "GET", url)) + return bool(data["state"]) if data is not None else None + + def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool: + action = "on" if state else "off" + url = f"{board.base_url}/relay/{action}?relay={relay_num}" + return _post(url, _auth(board, "POST", url)) is not None + + def toggle_relay(self, board: "Board", relay_num: int) -> bool | None: + url = f"{board.base_url}/relay/toggle?relay={relay_num}" + data = _post(url, _auth(board, "POST", url)) + return bool(data["state"]) if data is not None else None + + # ── poll ────────────────────────────────────────────────────────────────── + + def poll(self, board: "Board") -> dict: + _offline = {"relay_states": {}, "input_states": {}, "is_online": False} + + # ── Connectivity probe ───────────────────────────────────────────── + # Try the very first endpoint. If the board doesn't reply we know + # it's offline and skip all remaining requests (saving N × 3 s worth + # of connect timeouts on every recheck cycle). + relay_states: dict = {} + input_states: dict = {} + + if board.num_relays > 0: + url = f"{board.base_url}/relay/status?relay=1" + probe = _get(url, _auth(board, "GET", url)) + if probe is None: + return _offline + relay_states["relay_1"] = bool(probe.get("state", False)) + elif board.num_inputs > 0: + url = f"{board.base_url}/input/status?input=1" + probe = _get(url, _auth(board, "GET", url)) + if probe is None: + return _offline + input_states["input_1"] = bool(probe.get("state", False)) + else: + return _offline + + # Board is reachable — collect remaining endpoints + for n in range(2, board.num_relays + 1): + url = f"{board.base_url}/relay/status?relay={n}" + data = _get(url, _auth(board, "GET", url)) + if data is not None: + relay_states[f"relay_{n}"] = bool(data.get("state", False)) + + input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1 + for n in range(input_start, board.num_inputs + 1): + url = f"{board.base_url}/input/status?input={n}" + data = _get(url, _auth(board, "GET", url)) + if data is not None: + input_states[f"input_{n}"] = bool(data.get("state", True)) + + return { + "relay_states": relay_states, + "input_states": input_states, + "is_online": True, + } + + # ── webhook registration ────────────────────────────────────────────────── + + def register_webhook(self, board: "Board", callback_url: str) -> bool: + url = f"{board.base_url}/register?callback_url={callback_url}" + ok = _post(url, _auth(board, "POST", url)) is not None + if ok: + logger.info("Webhook registered on board '%s' → %s", board.name, callback_url) + else: + logger.warning("Webhook registration failed for board '%s'", board.name) + return ok + + # ── NFC access control ────────────────────────────────────────────────────── + + def get_nfc_status(self, board: "Board") -> dict | None: + """Return current NFC reader status (last UID, access_state, auth config).""" + url = f"{board.base_url}/nfc/status" + return _get(url, _auth(board, "GET", url)) + + def set_nfc_config( + self, + board: "Board", + auth_uid: str = "", + relay_num: int = 1, + pulse_ms: int = 3000, + ) -> bool: + """Push NFC access-control config to the board.""" + import urllib.parse + + url = ( + f"{board.base_url}/nfc/config" + f"?auth_uid={urllib.parse.quote(auth_uid.upper())}" + f"&relay={relay_num}" + f"&pulse_ms={pulse_ms}" + ) + result = _post(url, _auth(board, "POST", url)) + if result: + logger.info( + "NFC config pushed to board '%s': uid='%s' relay=%d pulse=%dms", + board.name, auth_uid, relay_num, pulse_ms, + ) + else: + logger.warning("NFC config push failed for board '%s'", board.name) + return result is not None + + def set_nfc_enabled(self, board: "Board", enabled: bool) -> bool: + """Enable or disable the NFC/Mifare access-control module on the board.""" + state = 1 if enabled else 0 + url = f"{board.base_url}/nfc/enable?state={state}" + result = _post(url, _auth(board, "POST", url)) + if result is not None: + logger.info( + "NFC module on board '%s' set to: %s", + board.name, "ENABLED" if enabled else "DISABLED", + ) + else: + logger.warning("NFC enable/disable failed for board '%s'", board.name) + return result is not None diff --git a/app/drivers/olimex_esp32_c5_evb/manifest.json b/app/drivers/olimex_esp32_c5_evb/manifest.json new file mode 100644 index 0000000..bdd1f84 --- /dev/null +++ b/app/drivers/olimex_esp32_c5_evb/manifest.json @@ -0,0 +1,18 @@ +{ + "driver_id": "olimex_esp32_c5_evb", + "display_name": "Olimex ESP32-C5-EVB", + "description": "Olimex ESP32-C5-EVB board with 2 relays (250V/10A) and 2 opto-isolated inputs (110-240VAC). Communicates via HTTP REST API with webhook callbacks.", + "manufacturer": "Olimex", + "chip": "ESP32-C5 WROOM-1", + "default_num_relays": 2, + "default_num_inputs": 2, + "firmware_url": "https://github.com/OLIMEX/ESP32-C5-EVB", + "api": { + "relay_on": "POST /relay/on?relay={n}", + "relay_off": "POST /relay/off?relay={n}", + "relay_toggle": "POST /relay/toggle?relay={n}", + "relay_status": "GET /relay/status?relay={n}", + "input_status": "GET /input/status?input={n}", + "register": "POST /register?callback_url={url}" + } +} diff --git a/app/routes/boards.py b/app/routes/boards.py index 95a3508..7d421ee 100644 --- a/app/routes/boards.py +++ b/app/routes/boards.py @@ -10,6 +10,7 @@ from app.drivers.registry import registry from flask import current_app _NFC_DRIVER_ID = "olimex_esp32_c6_evb_pn532" +_NFC_DRIVER_IDS = {"olimex_esp32_c6_evb_pn532", "olimex_esp32_c5_evb"} boards_bp = Blueprint("boards", __name__) @@ -290,9 +291,9 @@ def edit_labels(board_id: int): @login_required def nfc_management(board_id: int): board = db.get_or_404(Board, board_id) - if board.board_type != _NFC_DRIVER_ID: + if board.board_type not in _NFC_DRIVER_IDS: abort(404) - driver = registry.get(_NFC_DRIVER_ID) + driver = registry.get(board.board_type) nfc_status = driver.get_nfc_status(board) if driver else None return render_template( "boards/nfc.html", @@ -305,9 +306,9 @@ def nfc_management(board_id: int): @login_required def nfc_status_json(board_id: int): board = db.get_or_404(Board, board_id) - if board.board_type != _NFC_DRIVER_ID: + if board.board_type not in _NFC_DRIVER_IDS: abort(404) - driver = registry.get(_NFC_DRIVER_ID) + driver = registry.get(board.board_type) data = driver.get_nfc_status(board) if driver else None if data is None: return jsonify({"error": "Board unreachable"}), 502 @@ -320,22 +321,22 @@ def nfc_config_save(board_id: int): if not current_user.is_admin(): abort(403) board = db.get_or_404(Board, board_id) - if board.board_type != _NFC_DRIVER_ID: + if board.board_type not in _NFC_DRIVER_IDS: abort(404) - driver = registry.get(_NFC_DRIVER_ID) + driver = registry.get(board.board_type) if not driver: flash("NFC driver not available.", "danger") return redirect(url_for("boards.nfc_management", board_id=board_id)) auth_uid = request.form.get("auth_uid", "").strip().upper() relay_num = int(request.form.get("relay_num", 1)) - pulse_ms = int(request.form.get("pulse_ms", 3000)) + pulse_ms = int(request.form.get("pulse_ms", 0)) - if relay_num < 1 or relay_num > 4: - flash("Relay number must be 1–4.", "danger") + if relay_num < 1 or relay_num > board.num_relays: + flash(f"Relay number must be 1–{board.num_relays}.", "danger") return redirect(url_for("boards.nfc_management", board_id=board_id)) - if pulse_ms < 100 or pulse_ms > 60000: - flash("Absence timeout must be between 100 and 60 000 ms.", "danger") + if pulse_ms < 0 or pulse_ms > 60000: + flash("Release delay must be between 0 and 60 000 ms.", "danger") return redirect(url_for("boards.nfc_management", board_id=board_id)) ok = driver.set_nfc_config(board, auth_uid=auth_uid, relay_num=relay_num, pulse_ms=pulse_ms) @@ -354,9 +355,9 @@ def nfc_enroll(board_id: int): if not current_user.is_admin(): abort(403) board = db.get_or_404(Board, board_id) - if board.board_type != _NFC_DRIVER_ID: + if board.board_type not in _NFC_DRIVER_IDS: abort(404) - driver = registry.get(_NFC_DRIVER_ID) + driver = registry.get(board.board_type) if not driver: flash("NFC driver not available.", "danger") return redirect(url_for("boards.nfc_management", board_id=board_id)) @@ -372,7 +373,7 @@ def nfc_enroll(board_id: int): return redirect(url_for("boards.nfc_management", board_id=board_id)) relay_num = int(request.form.get("relay_num", status.get("relay_num", 1))) - pulse_ms = int(request.form.get("pulse_ms", status.get("pulse_ms", 3000))) + pulse_ms = int(request.form.get("pulse_ms", status.get("pulse_ms", 0))) ok = driver.set_nfc_config(board, auth_uid=uid, relay_num=relay_num, pulse_ms=pulse_ms) if ok: @@ -389,9 +390,9 @@ def nfc_enable(board_id: int): if not current_user.is_admin(): abort(403) board = db.get_or_404(Board, board_id) - if board.board_type != _NFC_DRIVER_ID: + if board.board_type not in _NFC_DRIVER_IDS: abort(404) - driver = registry.get(_NFC_DRIVER_ID) + driver = registry.get(board.board_type) if not driver: flash("NFC driver not available.", "danger") return redirect(url_for("boards.nfc_management", board_id=board_id)) diff --git a/app/templates/boards/detail.html b/app/templates/boards/detail.html index 04ccb50..abe1622 100644 --- a/app/templates/boards/detail.html +++ b/app/templates/boards/detail.html @@ -20,7 +20,7 @@ {% if current_user.is_admin() %}
- {% if board.board_type == 'olimex_esp32_c6_evb_pn532' %} + {% if board.board_type == 'olimex_esp32_c6_evb_pn532' or board.board_type == 'olimex_esp32_c5_evb' %} NFC Access Control diff --git a/app/templates/boards/nfc.html b/app/templates/boards/nfc.html index 3205ff7..33f1e38 100644 --- a/app/templates/boards/nfc.html +++ b/app/templates/boards/nfc.html @@ -98,7 +98,7 @@
Trigger relay
Relay {{ nfc.get('relay_num', 1) }}
Absence timeout
-
{{ nfc.get('pulse_ms', 3000) }} ms
+
{{ nfc.get('pulse_ms', 0) }} ms
@@ -152,16 +152,16 @@
- + + class="form-control" min="0" max="60000" + value="{{ nfc.get('pulse_ms', 0) }}">
- + -
Relay closes this many ms after the card is removed (100 – 60 000).
+ min="0" max="60000" value="{{ nfc.get('pulse_ms', 0) }}"> +
Relay turns off this many ms after the card is removed (0 = immediately).