Files
location_managemet/app/drivers/olimex_esp32_c6_evb/driver.py
scheianu 4766faa682 Add NFC access control support to Olimex ESP32-C6-EVB driver
- Add get_nfc_status() to retrieve NFC reader state (last UID, access state, auth config)
- Add set_nfc_config() to push NFC access-control config (auth_uid, relay_num, pulse_ms)
- Update API endpoint documentation in module docstring
2026-03-15 08:46:35 +02:00

179 lines
7.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Olimex ESP32-C6-EVB board driver.
Hardware
--------
- 4 relays (outputs)
- 4 digital inputs
- HTTP REST API served directly by the board on port 80
API endpoints
-------------
POST /relay/on?relay=<n> → {"state": true}
POST /relay/off?relay=<n> → {"state": false}
POST /relay/toggle?relay=<n> → {"state": <new>}
GET /relay/status?relay=<n> → {"state": <bool>}
GET /input/status?input=<n> → {"state": <bool>}
POST /register?callback_url=<url> → {"status": "ok"}
GET /nfc/status → {"initialized": bool, "card_present": bool,
"last_uid": str, "access_state": str,
"auth_uid": str, "relay_num": int,
"pulse_ms": int}
GET /nfc/config → {"auth_uid": str, "relay_num": int, "pulse_ms": int}
POST /nfc/config?auth_uid=&relay=&pulse_ms= → {"status": "ok", ...}
Webhook (board → server)
------------------------
The board POSTs to the registered callback_url whenever an input changes:
POST <callback_url>
{"input": <n>, "state": <bool>}
"""
from __future__ import annotations
import logging
import requests
from typing import TYPE_CHECKING
from app.drivers.base import BoardDriver
if TYPE_CHECKING:
from app.models.board import Board
logger = logging.getLogger(__name__)
_TIMEOUT = 3
def _get(url: str) -> dict | None:
try:
r = requests.get(url, timeout=_TIMEOUT)
r.raise_for_status()
return r.json()
except Exception as exc:
logger.debug("GET %s%s", url, exc)
return None
def _post(url: str) -> dict | None:
try:
r = requests.post(url, timeout=_TIMEOUT)
r.raise_for_status()
return r.json()
except Exception as exc:
logger.debug("POST %s%s", url, exc)
return None
class OlimexESP32C6EVBDriver(BoardDriver):
"""Driver for the Olimex ESP32-C6-EVB board."""
DRIVER_ID = "olimex_esp32_c6_evb"
DISPLAY_NAME = "Olimex ESP32-C6-EVB"
DESCRIPTION = "4 relays · 4 digital inputs · HTTP REST + webhook callbacks"
DEFAULT_NUM_RELAYS = 4
DEFAULT_NUM_INPUTS = 4
FIRMWARE_URL = "https://github.com/OLIMEX/ESP32-C6-EVB"
# ── relay control ─────────────────────────────────────────────────────────
def get_relay_status(self, board: "Board", relay_num: int) -> bool | None:
data = _get(f"{board.base_url}/relay/status?relay={relay_num}")
return bool(data["state"]) if data is not None else None
def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool:
action = "on" if state else "off"
return _post(f"{board.base_url}/relay/{action}?relay={relay_num}") is not None
def toggle_relay(self, board: "Board", relay_num: int) -> bool | None:
data = _post(f"{board.base_url}/relay/toggle?relay={relay_num}")
return bool(data["state"]) if data is not None else None
# ── poll ──────────────────────────────────────────────────────────────────
def poll(self, board: "Board") -> dict:
_offline = {"relay_states": {}, "input_states": {}, "is_online": False}
# ── Connectivity probe ─────────────────────────────────────────────
# Try the very first endpoint. If the board doesn't reply we know
# it's offline and skip all remaining requests (saving N × 3 s worth
# of connect timeouts on every recheck cycle).
relay_states: dict = {}
input_states: dict = {}
if board.num_relays > 0:
probe = _get(f"{board.base_url}/relay/status?relay=1")
if probe is None:
return _offline
relay_states["relay_1"] = bool(probe.get("state", False))
elif board.num_inputs > 0:
probe = _get(f"{board.base_url}/input/status?input=1")
if probe is None:
return _offline
input_states["input_1"] = bool(probe.get("state", False))
else:
return _offline
# Board is reachable — collect remaining endpoints
for n in range(2, board.num_relays + 1):
data = _get(f"{board.base_url}/relay/status?relay={n}")
if data is not None:
relay_states[f"relay_{n}"] = bool(data.get("state", False))
input_start = 2 if (board.num_relays == 0 and board.num_inputs > 0) else 1
for n in range(input_start, board.num_inputs + 1):
data = _get(f"{board.base_url}/input/status?input={n}")
if data is not None:
input_states[f"input_{n}"] = bool(data.get("state", True))
return {
"relay_states": relay_states,
"input_states": input_states,
"is_online": True,
}
# ── webhook registration ──────────────────────────────────────────────────
def register_webhook(self, board: "Board", callback_url: str) -> bool:
url = f"{board.base_url}/register?callback_url={callback_url}"
ok = _post(url) is not None
if ok:
logger.info("Webhook registered on board '%s' \u2192 %s", board.name, callback_url)
else:
logger.warning("Webhook registration failed for board '%s'", board.name)
return ok
# ── NFC access control ──────────────────────────────────────────────────────
def get_nfc_status(self, board: "Board") -> dict | None:
"""Return current NFC reader status (last UID, access_state, auth config)."""
return _get(f"{board.base_url}/nfc/status")
def set_nfc_config(
self,
board: "Board",
auth_uid: str = "",
relay_num: int = 1,
pulse_ms: int = 3000,
) -> bool:
"""Push NFC access-control config to the board.
auth_uid: authorized card UID (e.g. "04:AB:CD:EF"); empty = any card opens relay.
relay_num: which relay to open on a matching card (1-4).
pulse_ms: how long to hold the relay open in milliseconds (100-60000).
"""
import urllib.parse
url = (
f"{board.base_url}/nfc/config"
f"?auth_uid={urllib.parse.quote(auth_uid.upper())}"
f"&relay={relay_num}"
f"&pulse_ms={pulse_ms}"
)
result = _post(url)
if result:
logger.info(
"NFC config pushed to board '%s': uid='%s' relay=%d pulse=%dms",
board.name, auth_uid, relay_num, pulse_ms,
)
else:
logger.warning("NFC config push failed for board '%s'", board.name)
return result is not None