"""Sonoff eWeLink Gateway Driver. This driver represents an entire eWeLink account as a single "board". Each discovered Sonoff device is stored as a SonoffDevice record linked to the board. Device control goes through the LAN (if device IP is known) with automatic cloud fallback. Key differences from hardware boards: - relay_num / input_num are NOT used (Sonoff devices are managed separately) - poll() syncs the SonoffDevice table from cloud - set_relay() / toggle_relay() are stubs (use set_device_channel() instead) """ from __future__ import annotations import json import logging from typing import TYPE_CHECKING from app.drivers.base import BoardDriver from .ewelink_api import ( EWeLinkAuthError, get_devices, login, parse_device, send_lan_command, ) if TYPE_CHECKING: from app.models.board import Board logger = logging.getLogger(__name__) class SonoffEweLinkDriver(BoardDriver): DRIVER_ID = "sonoff_ewelink" DISPLAY_NAME = "Sonoff eWeLink Gateway" DESCRIPTION = ( "Control all Sonoff devices in an eWeLink account over LAN and/or Cloud" ) DEFAULT_NUM_RELAYS = 0 DEFAULT_NUM_INPUTS = 0 # ── Abstract method stubs (not used for gateway boards) ─────────────────── def get_relay_status(self, board: "Board", relay_num: int): return None def set_relay(self, board: "Board", relay_num: int, state: bool) -> bool: return False def toggle_relay(self, board: "Board", relay_num: int): return None def register_webhook(self, board: "Board", callback_url: str) -> bool: return True # No webhook concept for this driver def poll(self, board: "Board") -> dict: """Sync device list + states from eWeLink cloud. Returns a minimal dict so board_service doesn't break. Actual device states are stored in SonoffDevice records. """ try: self.sync_devices(board) board.is_online = True except Exception as exc: logger.warning("Sonoff poll failed for board %d: %s", board.id, exc) board.is_online = False return { "relay_states": {}, "input_states": {}, "is_online": board.is_online, } # ── Gateway-specific public API ─────────────────────────────────────────── def ensure_token(self, board: "Board") -> bool: """Authenticate with eWeLink if no valid token is stored. Updates board.config in-place (caller must commit). Returns True if token is available. """ cfg = board.config at = cfg.get("ewelink_at") if at: return True username = cfg.get("ewelink_username", "") password = cfg.get("ewelink_password", "") country = cfg.get("ewelink_country_code", "+1") if not username or not password: logger.error("Board %d has no eWeLink credentials", board.id) return False try: auth = login(username, password, country) cfg["ewelink_at"] = auth["at"] cfg["ewelink_region"] = auth["region"] cfg["ewelink_apikey"] = auth["apikey"] board.config = cfg logger.info("eWeLink login OK for board %d (region=%s)", board.id, auth["region"]) return True except EWeLinkAuthError as exc: logger.error("eWeLink auth error for board %d: %s", board.id, exc) cfg.pop("ewelink_at", None) board.config = cfg return False except Exception as exc: logger.error("eWeLink login failed for board %d: %s", board.id, exc) return False def sync_devices(self, board: "Board") -> int: """Fetch device list from cloud, upsert SonoffDevice records. Returns number of devices synced. """ from app import db from app.models.sonoff_device import SonoffDevice from datetime import datetime if not self.ensure_token(board): return 0 cfg = board.config at = cfg["ewelink_at"] region = cfg.get("ewelink_region", "eu") try: raw_devices = get_devices(at, region) except Exception as exc: # Token may have expired — clear it and retry once if "401" in str(exc) or "token" in str(exc).lower(): cfg.pop("ewelink_at", None) board.config = cfg if not self.ensure_token(board): return 0 at = board.config["ewelink_at"] region = board.config.get("ewelink_region", "eu") raw_devices = get_devices(at, region) else: raise # Build lookup of existing records existing: dict[str, SonoffDevice] = { d.device_id: d for d in SonoffDevice.query.filter_by(board_id=board.id).all() } synced = 0 seen_ids: set[str] = set() for raw in raw_devices: info = parse_device(raw) did = info["device_id"] if not did: continue seen_ids.add(did) dev = existing.get(did) if dev is None: from app.models.sonoff_device import _uiid_info dev = SonoffDevice(board_id=board.id, device_id=did) db.session.add(dev) # Always update these from cloud if not dev.name: dev.name = info["name"] dev.uiid = info["uiid"] dev.model = info["model"] dev.firmware = info["firmware"] dev.device_key = info["device_key"] dev.api_key = info["api_key"] dev.is_online = info["online"] from app.models.sonoff_device import _uiid_info dev.num_channels = _uiid_info(info["uiid"]).get("channels", 1) if info["params"]: dev.params = info["params"] if info["ip_address"]: dev.ip_address = info["ip_address"] dev.port = info["port"] or 8081 if info["online"]: dev.last_seen = datetime.utcnow() synced += 1 db.session.commit() logger.info("Board %d: synced %d Sonoff devices", board.id, synced) return synced def set_device_channel( self, board: "Board", device_id: str, channel: int, state: bool, ) -> bool: """Turn a Sonoff device channel ON or OFF. Tries LAN first, falls back to cloud. """ from app.models.sonoff_device import SonoffDevice dev = SonoffDevice.query.filter_by( board_id=board.id, device_id=device_id ).first() if not dev: logger.error("Device %s not found on board %d", device_id, board.id) return False switch_val = "on" if state else "off" # Build params dict if dev.num_channels == 1: params = {"switch": switch_val} command = "switch" else: params = {"switches": [{"outlet": channel, "switch": switch_val}]} command = "switches" # ── Try LAN first ─────────────────────────────────────────────────── if dev.ip_address: ok = send_lan_command( ip=dev.ip_address, port=dev.port or 8081, device_id=device_id, device_key=dev.device_key or "", params=params, command=command, ) if ok: self._apply_state(dev, channel, state) from app import db db.session.commit() return True logger.debug("LAN failed for %s, trying cloud", device_id) # ── Cloud fallback ─────────────────────────────────────────────────── if not self.ensure_token(board): return False cfg = board.config ok = self._cloud_set( at=cfg["ewelink_at"], region=cfg.get("ewelink_region", "eu"), device_id=device_id, api_key=dev.api_key or cfg.get("ewelink_apikey", ""), params=params, ) if ok: self._apply_state(dev, channel, state) from app import db db.session.commit() return ok # ── Private helpers ─────────────────────────────────────────────────────── @staticmethod def _apply_state(dev, channel: int, state: bool) -> None: """Update the SonoffDevice params in-memory after a successful command.""" p = dev.params switch_val = "on" if state else "off" if dev.num_channels == 1: p["switch"] = switch_val else: switches = p.get("switches", []) updated = False for s in switches: if s.get("outlet") == channel: s["switch"] = switch_val updated = True break if not updated: switches.append({"outlet": channel, "switch": switch_val}) p["switches"] = switches dev.params = p @staticmethod def _cloud_set(at: str, region: str, device_id: str, api_key: str, params: dict) -> bool: """Send a device update via the eWeLink cloud REST API (simple HTTP POST). This uses the v2 update-device endpoint rather than the WebSocket, which is simpler for synchronous Flask code. """ import requests as req from .ewelink_api import API, APPID import time headers = {"Authorization": f"Bearer {at}", "X-CK-Appid": APPID} payload = { "deviceid": device_id, "params": params, } url = f"{API[region]}/v2/device/thing/status" try: r = req.post(url, json=payload, headers=headers, timeout=8) resp = r.json() ok = resp.get("error") == 0 if ok: logger.debug("Cloud %s → %s OK", device_id, params) else: logger.warning("Cloud %s → %s ERR %s", device_id, params, resp) return ok except Exception as exc: logger.error("Cloud send failed for %s: %s", device_id, exc) return False