Files
2026-06-11 00:44:19 +03:00

212 lines
8.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Olimex ESP32-C5-EVB board driver.
Hardware
--------
- 2 relays (outputs, 250V/10A)
- 2 opto-isolated inputs (110VAC-240VAC)
- HTTP REST API served directly by the board on port 80
API endpoints
-------------
POST /relay/on?relay=<n> → {"state": true}
POST /relay/off?relay=<n> → {"state": false}
POST /relay/toggle?relay=<n> → {"state": <new>}
GET /relay/status?relay=<n> → {"state": <bool>}
GET /input/status?input=<n> → {"state": <bool>}
POST /register?callback_url=<url> → {"status": "ok"}
GET /nfc/status → {"initialized": bool, "card_present": bool,
"last_uid": str, "access_state": str,
"auth_uid": str, "relay_num": int,
"pulse_ms": int}
GET /nfc/config → {"auth_uid": str, "relay_num": int, "pulse_ms": int}
POST /nfc/config?auth_uid=&relay=&pulse_ms= → {"status": "ok", ...}
Webhook (board → server)
------------------------
The board POSTs to the registered callback_url whenever an input changes:
POST <callback_url>
{"input": <n>, "state": <bool>}
"""
from __future__ import annotations
import hashlib
import hmac
import logging
import time as _time
import urllib.parse
import requests
from typing import TYPE_CHECKING
from app.drivers.base import BoardDriver
if TYPE_CHECKING:
from app.models.board import Board
logger = logging.getLogger(__name__)
_TIMEOUT = 3
def _get(url: str, headers: dict | None = None) -> dict | None:
try:
r = requests.get(url, timeout=_TIMEOUT, headers=headers or {})
r.raise_for_status()
return r.json()
except Exception as exc:
logger.debug("GET %s%s", url, exc)
return None
def _post(url: str, headers: dict | None = None) -> dict | None:
try:
r = requests.post(url, timeout=_TIMEOUT, headers=headers or {})
r.raise_for_status()
return r.json()
except Exception as exc:
logger.debug("POST %s%s", url, exc)
return None
def _auth(board: "Board", method: str, url: str) -> dict:
"""Build HMAC-SHA256 auth headers. Returns {} when no api_secret set."""
secret: str = getattr(board, "api_secret", None) or ""
if not secret:
return {}
path = urllib.parse.urlparse(url).path
ts = str(int(_time.time()))
msg = f"{method}:{path}:{ts}".encode()
sig = hmac.new(secret.encode(), msg, hashlib.sha256).hexdigest()
return {"X-Request-Time": ts, "X-Request-Sig": sig}
class OlimexESP32C5EVBDriver(BoardDriver):
"""Driver for the Olimex ESP32-C5-EVB board."""
DRIVER_ID = "olimex_esp32_c5_evb"
DISPLAY_NAME = "Olimex ESP32-C5-EVB"
DESCRIPTION = "2 relays (250V/10A) · 2 opto-isolated inputs (110-240VAC) · HTTP REST + webhook callbacks"
DEFAULT_NUM_RELAYS = 2
DEFAULT_NUM_INPUTS = 2
FIRMWARE_URL = "https://github.com/OLIMEX/ESP32-C5-EVB"
# ── relay control ─────────────────────────────────────────────────────────
def get_relay_status(self, board: "Board", relay_num: int) -> bool | None:
url = f"{board.base_url}/relay/status?relay={relay_num}"
data = _get(url, _auth(board, "GET", url))
return bool(data["state"]) if data is not None else None
def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool:
action = "on" if state else "off"
url = f"{board.base_url}/relay/{action}?relay={relay_num}"
return _post(url, _auth(board, "POST", url)) is not None
def toggle_relay(self, board: "Board", relay_num: int) -> bool | None:
url = f"{board.base_url}/relay/toggle?relay={relay_num}"
data = _post(url, _auth(board, "POST", url))
return bool(data["state"]) if data is not None else None
# ── poll ──────────────────────────────────────────────────────────────────
def poll(self, board: "Board") -> dict:
_offline = {"relay_states": {}, "input_states": {}, "is_online": False}
# ── Connectivity probe ─────────────────────────────────────────────
# Try the very first endpoint. If the board doesn't reply we know
# it's offline and skip all remaining requests (saving N × 3 s worth
# of connect timeouts on every recheck cycle).
relay_states: dict = {}
input_states: dict = {}
if board.num_relays > 0:
url = f"{board.base_url}/relay/status?relay=1"
probe = _get(url, _auth(board, "GET", url))
if probe is None:
return _offline
relay_states["relay_1"] = bool(probe.get("state", False))
elif board.num_inputs > 0:
url = f"{board.base_url}/input/status?input=1"
probe = _get(url, _auth(board, "GET", url))
if probe is None:
return _offline
input_states["input_1"] = bool(probe.get("state", False))
else:
return _offline
# Board is reachable — collect remaining endpoints
for n in range(2, board.num_relays + 1):
url = f"{board.base_url}/relay/status?relay={n}"
data = _get(url, _auth(board, "GET", url))
if data is not None:
relay_states[f"relay_{n}"] = bool(data.get("state", False))
input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1
for n in range(input_start, board.num_inputs + 1):
url = f"{board.base_url}/input/status?input={n}"
data = _get(url, _auth(board, "GET", url))
if data is not None:
input_states[f"input_{n}"] = bool(data.get("state", True))
return {
"relay_states": relay_states,
"input_states": input_states,
"is_online": True,
}
# ── webhook registration ──────────────────────────────────────────────────
def register_webhook(self, board: "Board", callback_url: str) -> bool:
url = f"{board.base_url}/register?callback_url={callback_url}"
ok = _post(url, _auth(board, "POST", url)) is not None
if ok:
logger.info("Webhook registered on board '%s'%s", board.name, callback_url)
else:
logger.warning("Webhook registration failed for board '%s'", board.name)
return ok
# ── NFC access control ──────────────────────────────────────────────────────
def get_nfc_status(self, board: "Board") -> dict | None:
"""Return current NFC reader status (last UID, access_state, auth config)."""
url = f"{board.base_url}/nfc/status"
return _get(url, _auth(board, "GET", url))
def set_nfc_config(
self,
board: "Board",
auth_uid: str = "",
relay_num: int = 1,
pulse_ms: int = 3000,
) -> bool:
"""Push NFC access-control config to the board."""
import urllib.parse
url = (
f"{board.base_url}/nfc/config"
f"?auth_uid={urllib.parse.quote(auth_uid.upper())}"
f"&relay={relay_num}"
f"&pulse_ms={pulse_ms}"
)
result = _post(url, _auth(board, "POST", url))
if result:
logger.info(
"NFC config pushed to board '%s': uid='%s' relay=%d pulse=%dms",
board.name, auth_uid, relay_num, pulse_ms,
)
else:
logger.warning("NFC config push failed for board '%s'", board.name)
return result is not None
def set_nfc_enabled(self, board: "Board", enabled: bool) -> bool:
"""Enable or disable the NFC/Mifare access-control module on the board."""
state = 1 if enabled else 0
url = f"{board.base_url}/nfc/enable?state={state}"
result = _post(url, _auth(board, "POST", url))
if result is not None:
logger.info(
"NFC module on board '%s' set to: %s",
board.name, "ENABLED" if enabled else "DISABLED",
)
else:
logger.warning("NFC enable/disable failed for board '%s'", board.name)
return result is not None