30 lines
1.0 KiB
Python
30 lines
1.0 KiB
Python
"""Webhook handler for Olimex ESP32-C6-EVB input events."""
|
|
import logging
|
|
from aiohttp import web
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers.dispatcher import async_dispatcher_send
|
|
|
|
from .const import DOMAIN
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
|
|
async def handle_input_event(hass: HomeAssistant, webhook_id: str, request) -> web.Response:
|
|
"""Handle input event webhook from the board."""
|
|
try:
|
|
data = await request.json()
|
|
input_num = data.get("input")
|
|
state = data.get("state")
|
|
|
|
_LOGGER.info("Received input event: input=%s state=%s", input_num, state)
|
|
|
|
# Dispatch signal to update binary sensors immediately
|
|
signal = f"{DOMAIN}_input_{input_num}_event"
|
|
async_dispatcher_send(hass, signal, state)
|
|
|
|
return web.json_response({"status": "ok"})
|
|
|
|
except Exception as err:
|
|
_LOGGER.error("Error handling webhook: %s", err)
|
|
return web.json_response({"error": str(err)}, status=400)
|