"""Synchronous eWeLink / Sonoff cloud API wrapper. Uses the same authentication scheme as SonoffLAN (requests-based, no asyncio). Supports: - Login with email / phone number - Device list retrieval (cloud) - LAN relay control (HTTP to device, with optional AES encryption) - Cloud relay control (HTTP fallback, not WebSocket — state only) """ from __future__ import annotations import base64 import hashlib import hmac import json import logging import time from typing import Optional import requests logger = logging.getLogger(__name__) # ── API endpoints ────────────────────────────────────────────────────────────── APPID = "R8Oq3y0eSZSYdKccHlrQzT1ACCOUT9Gv" API = { "cn": "https://cn-apia.coolkit.cn", "as": "https://as-apia.coolkit.cc", "us": "https://us-apia.coolkit.cc", "eu": "https://eu-apia.coolkit.cc", } # Country code → (Country, region) — subset of full list, used only for # auto-detecting the default region during login. REGIONS: dict[str, tuple[str, str]] = { "+1": ("United States", "us"), "+7": ("Russia", "eu"), "+20": ("Egypt", "eu"), "+27": ("South Africa", "eu"), "+30": ("Greece", "eu"), "+31": ("Netherlands", "eu"), "+32": ("Belgium", "eu"), "+33": ("France", "eu"), "+34": ("Spain", "eu"), "+36": ("Hungary", "eu"), "+39": ("Italy", "eu"), "+40": ("Romania", "eu"), "+41": ("Switzerland", "eu"), "+43": ("Austria", "eu"), "+44": ("UK", "eu"), "+45": ("Denmark", "eu"), "+46": ("Sweden", "eu"), "+47": ("Norway", "eu"), "+48": ("Poland", "eu"), "+49": ("Germany", "eu"), "+51": ("Peru", "us"), "+52": ("Mexico", "us"), "+54": ("Argentina", "us"), "+55": ("Brazil", "us"), "+56": ("Chile", "us"), "+57": ("Colombia", "us"), "+58": ("Venezuela", "us"), "+60": ("Malaysia", "as"), "+61": ("Australia", "us"), "+62": ("Indonesia", "as"), "+63": ("Philippines", "as"), "+64": ("New Zealand", "us"), "+65": ("Singapore", "as"), "+66": ("Thailand", "as"), "+81": ("Japan", "as"), "+82": ("South Korea", "as"), "+84": ("Vietnam", "as"), "+86": ("China", "cn"), "+90": ("Turkey", "as"), "+91": ("India", "as"), "+92": ("Pakistan", "as"), "+94": ("Sri Lanka", "as"), "+351": ("Portugal", "eu"), "+352": ("Luxembourg", "eu"), "+353": ("Ireland", "eu"), "+354": ("Iceland", "eu"), "+358": ("Finland", "eu"), "+359": ("Bulgaria", "eu"), "+370": ("Lithuania", "eu"), "+371": ("Latvia", "eu"), "+372": ("Estonia", "eu"), "+380": ("Ukraine", "eu"), "+381": ("Serbia", "eu"), "+385": ("Croatia", "eu"), "+386": ("Slovenia", "eu"), "+420": ("Czech", "eu"), "+421": ("Slovakia", "eu"), "+886": ("Taiwan, China", "as"), "+966": ("Saudi Arabia", "as"), "+971": ("United Arab Emirates", "as"), "+972": ("Israel", "as"), "+974": ("Qatar", "as"), "+351": ("Portugal", "eu"), } DEFAULT_REGION = "eu" # ── HMAC signing ─────────────────────────────────────────────────────────────── # Pre-computed from SonoffLAN's obfuscated key-derivation algorithm. # The algorithm builds a base64 string from the full REGIONS dict and indexes # into it — the result is always the same 32-byte ASCII key below. _SIGN_KEY: bytes = b"1ve5Qk9GXfUhKAn1svnKwpAlxXkMarru" def _sign(data: bytes) -> str: digest = hmac.new(_SIGN_KEY, data, hashlib.sha256).digest() return base64.b64encode(digest).decode() # ── LAN encryption (mirrors SonoffLAN local.py) ─────────────────────────────── def _pad(data: bytes, block_size: int = 16) -> bytes: padding_len = block_size - len(data) % block_size return data + bytes([padding_len]) * padding_len def _unpad(data: bytes) -> bytes: return data[: -data[-1]] def _encrypt_payload(payload: dict, device_key: str) -> dict: """AES-CBC encrypt the 'data' field of a LAN payload.""" try: from Crypto.Cipher import AES from Crypto.Hash import MD5 from Crypto.Random import get_random_bytes except ImportError: raise RuntimeError( "pycryptodome is required for encrypted LAN control. " "Install with: pip install pycryptodome" ) key = MD5.new(device_key.encode()).digest() iv = get_random_bytes(16) plaintext = json.dumps(payload["data"]).encode() cipher = AES.new(key, AES.MODE_CBC, iv=iv) ciphertext = cipher.encrypt(_pad(plaintext)) payload = dict(payload) payload["encrypt"] = True payload["data"] = base64.b64encode(ciphertext).decode() payload["iv"] = base64.b64encode(iv).decode() return payload def _decrypt_payload(data_b64: str, iv_b64: str, device_key: str) -> dict: try: from Crypto.Cipher import AES from Crypto.Hash import MD5 except ImportError: return {} key = MD5.new(device_key.encode()).digest() cipher = AES.new(key, AES.MODE_CBC, iv=base64.b64decode(iv_b64)) plaintext = _unpad(cipher.decrypt(base64.b64decode(data_b64))) return json.loads(plaintext) # ── Public API ───────────────────────────────────────────────────────────────── class EWeLinkAuthError(Exception): pass def login(username: str, password: str, country_code: str = "+1") -> dict: """Authenticate with eWeLink and return auth dict. Returns: {"at": , "region": , "apikey": } Raises: EWeLinkAuthError on wrong credentials. requests.RequestException on network errors. """ region = REGIONS.get(country_code, (None, DEFAULT_REGION))[1] payload: dict = {"password": password, "countryCode": country_code} if "@" in username: payload["email"] = username elif username.startswith("+"): payload["phoneNumber"] = username else: payload["phoneNumber"] = "+" + username data = json.dumps(payload, separators=(",", ":")).encode() headers = { "Authorization": "Sign " + _sign(data), "Content-Type": "application/json", "X-CK-Appid": APPID, } r = requests.post(f"{API[region]}/v2/user/login", data=data, headers=headers, timeout=10) r.raise_for_status() resp = r.json() # eWeLink may redirect to a different region if resp.get("error") == 10004: region = resp["data"]["region"] r = requests.post(f"{API[region]}/v2/user/login", data=data, headers=headers, timeout=10) r.raise_for_status() resp = r.json() if resp.get("error") != 0: raise EWeLinkAuthError(resp.get("msg", "Login failed")) d = resp["data"] return { "at": d["at"], "region": region, "apikey": d["user"]["apikey"], "username": username, } def get_devices(at: str, region: str) -> list[dict]: """Retrieve all devices from eWeLink cloud. Each device dict has at least: deviceid, name, params, extra.uiid, devicekey, online, localtype, ip (if LAN-reachable) """ headers = {"Authorization": f"Bearer {at}", "X-CK-Appid": APPID} url = f"{API[region]}/v2/device/thing" r = requests.get(url, headers=headers, params={"num": 0}, timeout=15) r.raise_for_status() resp = r.json() if resp.get("error") != 0: raise Exception(resp.get("msg", "Failed to fetch devices")) devices = [] for item in resp["data"].get("thingList", []): d = item.get("itemData", {}) if "deviceid" in d: devices.append(d) return devices def send_lan_command( ip: str, port: int, device_id: str, device_key: str, params: dict, command: str = "switch", timeout: int = 5, ) -> bool: """Send a control command to a Sonoff device via local LAN. Args: ip: Device IP address (without port) port: Device HTTP port (usually 8081) device_id: 10-char eWeLink device ID device_key: AES encryption key (blank string for DIY/unencrypted devices) params: e.g. {"switch": "on"} or {"switches": [{"outlet": 0, "switch": "on"}]} command: zeroconf command path (switch / switches / dimmable / etc.) timeout: HTTP request timeout in seconds Returns True on success, False on failure. """ sequence = str(int(time.time() * 1000)) payload: dict = { "sequence": sequence, "deviceid": device_id, "selfApikey": "123", "data": params, } if device_key: payload = _encrypt_payload(payload, device_key) url = f"http://{ip}:{port}/zeroconf/{command}" try: r = requests.post( url, json=payload, headers={"Connection": "close"}, timeout=timeout, ) resp = r.json() ok = resp.get("error") == 0 if ok: logger.debug("LAN %s → %s OK", device_id, params) else: logger.warning("LAN %s → %s ERR %s", device_id, params, resp) return ok except requests.RequestException as exc: logger.debug("LAN %s → %s FAILED: %s", device_id, params, exc) return False def get_device_state_lan(ip: str, port: int, device_id: str, device_key: str, timeout: int = 5) -> dict | None: """Poll a device's current state via LAN. Returns params dict or None on failure. """ sequence = str(int(time.time() * 1000)) payload: dict = { "sequence": sequence, "deviceid": device_id, "selfApikey": "123", "data": {}, } if device_key: payload = _encrypt_payload(payload, device_key) url = f"http://{ip}:{port}/zeroconf/info" try: r = requests.post( url, json=payload, headers={"Connection": "close"}, timeout=timeout, ) resp = r.json() if resp.get("error") != 0: return None # If encrypted, decrypt; otherwise data is already dict if resp.get("encrypt") and device_key: return _decrypt_payload(resp["data"], resp["iv"], device_key) data = resp.get("data", {}) if isinstance(data, str): return json.loads(data) return data except Exception as exc: logger.debug("LAN getState %s FAILED: %s", device_id, exc) return None def parse_device(raw: dict) -> dict: """Normalise a raw eWeLink cloud device dict into a clean summary.""" extra = raw.get("extra", {}) params = raw.get("params", {}) return { "device_id": raw.get("deviceid", ""), "name": raw.get("name", ""), "uiid": extra.get("uiid", 1), "model": extra.get("model", ""), "firmware": params.get("fwVersion", ""), "device_key": raw.get("devicekey", ""), "api_key": raw.get("apikey", ""), "online": raw.get("online", False), "params": params, # LAN info (if available in this snapshot — usually not in REST response) "ip_address": raw.get("localNetworkInfo", {}).get("ip", ""), "port": raw.get("localNetworkInfo", {}).get("port", 8081), }