"""Driver registry — auto-discovers board drivers from sub-folders. At application startup call ``registry.discover()``. Every sub-folder of ``app/drivers/`` that contains a ``driver.py`` with a ``BoardDriver`` subclass is registered automatically. No manual imports needed. Usage ----- :: from app.drivers.registry import registry driver = registry.get("olimex_esp32_c6_evb") driver.set_relay(board, 1, True) # All registered drivers (for UI drop-downs): all_drivers = registry.all() """ from __future__ import annotations import importlib import inspect import logging import os from typing import TYPE_CHECKING from app.drivers.base import BoardDriver if TYPE_CHECKING: pass logger = logging.getLogger(__name__) _DRIVERS_DIR = os.path.dirname(__file__) class DriverRegistry: def __init__(self): self._drivers: dict[str, BoardDriver] = {} # ── discovery ───────────────────────────────────────────────────────────── def discover(self) -> None: """Scan ``app/drivers/`` sub-folders and register found drivers.""" for entry in sorted(os.scandir(_DRIVERS_DIR), key=lambda e: e.name): if not entry.is_dir() or entry.name.startswith(("_", ".")): continue driver_module_path = os.path.join(entry.path, "driver.py") if not os.path.isfile(driver_module_path): logger.debug("Skipping %s — no driver.py found", entry.name) continue module_name = f"app.drivers.{entry.name}.driver" try: mod = importlib.import_module(module_name) self._register_from_module(mod, entry.name) except Exception as exc: logger.error("Failed to load driver '%s': %s", entry.name, exc, exc_info=True) def _register_from_module(self, mod, folder_name: str) -> None: for _name, obj in inspect.getmembers(mod, inspect.isclass): if ( issubclass(obj, BoardDriver) and obj is not BoardDriver and obj.DRIVER_ID ): instance = obj() self._drivers[obj.DRIVER_ID] = instance logger.info( "Board driver registered: '%s' (%s) from folder '%s'", obj.DRIVER_ID, obj.DISPLAY_NAME, folder_name, ) return logger.warning( "Folder '%s' has driver.py but no BoardDriver subclass with DRIVER_ID found", folder_name, ) # ── lookup ──────────────────────────────────────────────────────────────── def get(self, driver_id: str) -> BoardDriver | None: return self._drivers.get(driver_id) def get_or_default(self, driver_id: str) -> BoardDriver: """Return driver or fall back to the generic_esp32 stub.""" return self._drivers.get(driver_id) or self._drivers.get("generic_esp32") or next(iter(self._drivers.values())) def all(self) -> list[BoardDriver]: return list(self._drivers.values()) def choices(self) -> list[tuple[str, str]]: """Return list of (DRIVER_ID, DISPLAY_NAME) for form select fields.""" return [(d.DRIVER_ID, d.DISPLAY_NAME) for d in self._drivers.values()] def is_registered(self, driver_id: str) -> bool: return driver_id in self._drivers def __len__(self) -> int: return len(self._drivers) def __repr__(self) -> str: return f"" # Singleton — import this everywhere registry = DriverRegistry()