105 lines
3.8 KiB
Python
105 lines
3.8 KiB
Python
"""Driver registry — auto-discovers board drivers from sub-folders.
|
|
|
|
At application startup call ``registry.discover()``. Every sub-folder of
|
|
``app/drivers/`` that contains a ``driver.py`` with a ``BoardDriver`` subclass
|
|
is registered automatically. No manual imports needed.
|
|
|
|
Usage
|
|
-----
|
|
::
|
|
from app.drivers.registry import registry
|
|
|
|
driver = registry.get("olimex_esp32_c6_evb")
|
|
driver.set_relay(board, 1, True)
|
|
|
|
# All registered drivers (for UI drop-downs):
|
|
all_drivers = registry.all()
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import importlib
|
|
import inspect
|
|
import logging
|
|
import os
|
|
from typing import TYPE_CHECKING
|
|
|
|
from app.drivers.base import BoardDriver
|
|
|
|
if TYPE_CHECKING:
|
|
pass
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DRIVERS_DIR = os.path.dirname(__file__)
|
|
|
|
|
|
class DriverRegistry:
|
|
def __init__(self):
|
|
self._drivers: dict[str, BoardDriver] = {}
|
|
|
|
# ── discovery ─────────────────────────────────────────────────────────────
|
|
|
|
def discover(self) -> None:
|
|
"""Scan ``app/drivers/`` sub-folders and register found drivers."""
|
|
for entry in sorted(os.scandir(_DRIVERS_DIR), key=lambda e: e.name):
|
|
if not entry.is_dir() or entry.name.startswith(("_", ".")):
|
|
continue
|
|
driver_module_path = os.path.join(entry.path, "driver.py")
|
|
if not os.path.isfile(driver_module_path):
|
|
logger.debug("Skipping %s — no driver.py found", entry.name)
|
|
continue
|
|
module_name = f"app.drivers.{entry.name}.driver"
|
|
try:
|
|
mod = importlib.import_module(module_name)
|
|
self._register_from_module(mod, entry.name)
|
|
except Exception as exc:
|
|
logger.error("Failed to load driver '%s': %s", entry.name, exc, exc_info=True)
|
|
|
|
def _register_from_module(self, mod, folder_name: str) -> None:
|
|
for _name, obj in inspect.getmembers(mod, inspect.isclass):
|
|
if (
|
|
issubclass(obj, BoardDriver)
|
|
and obj is not BoardDriver
|
|
and obj.DRIVER_ID
|
|
):
|
|
instance = obj()
|
|
self._drivers[obj.DRIVER_ID] = instance
|
|
logger.info(
|
|
"Board driver registered: '%s' (%s) from folder '%s'",
|
|
obj.DRIVER_ID, obj.DISPLAY_NAME, folder_name,
|
|
)
|
|
return
|
|
logger.warning(
|
|
"Folder '%s' has driver.py but no BoardDriver subclass with DRIVER_ID found",
|
|
folder_name,
|
|
)
|
|
|
|
# ── lookup ────────────────────────────────────────────────────────────────
|
|
|
|
def get(self, driver_id: str) -> BoardDriver | None:
|
|
return self._drivers.get(driver_id)
|
|
|
|
def get_or_default(self, driver_id: str) -> BoardDriver:
|
|
"""Return driver or fall back to the generic_esp32 stub."""
|
|
return self._drivers.get(driver_id) or self._drivers.get("generic_esp32") or next(iter(self._drivers.values()))
|
|
|
|
def all(self) -> list[BoardDriver]:
|
|
return list(self._drivers.values())
|
|
|
|
def choices(self) -> list[tuple[str, str]]:
|
|
"""Return list of (DRIVER_ID, DISPLAY_NAME) for form select fields."""
|
|
return [(d.DRIVER_ID, d.DISPLAY_NAME) for d in self._drivers.values()]
|
|
|
|
def is_registered(self, driver_id: str) -> bool:
|
|
return driver_id in self._drivers
|
|
|
|
def __len__(self) -> int:
|
|
return len(self._drivers)
|
|
|
|
def __repr__(self) -> str:
|
|
return f"<DriverRegistry drivers={list(self._drivers.keys())}>"
|
|
|
|
|
|
# Singleton — import this everywhere
|
|
registry = DriverRegistry()
|