Files
olimex_ESP32-C6-EVB/custom_components/olimex_esp32_c6/binary_sensor.py

91 lines
3.0 KiB
Python

"""Binary sensor platform for Olimex ESP32-C6-EVB."""
import logging
from homeassistant.components.binary_sensor import BinarySensorEntity
from homeassistant.config_entries import ConfigEntry
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers.entity_platform import AddEntitiesCallback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from .const import DOMAIN, NUM_INPUTS
_LOGGER = logging.getLogger(__name__)
async def async_setup_entry(
hass: HomeAssistant,
entry: ConfigEntry,
async_add_entities: AddEntitiesCallback,
) -> None:
"""Set up binary sensor entities for inputs."""
data = hass.data[DOMAIN][entry.entry_id]
sensors = [
OlimexInputSensor(hass, entry, input_num)
for input_num in range(1, NUM_INPUTS + 1)
]
async_add_entities(sensors, update_before_add=False)
class OlimexInputSensor(BinarySensorEntity):
"""Binary sensor for Olimex input pin.
State is driven exclusively by webhook POSTs from the board.
No polling is performed.
"""
_attr_should_poll = False
def __init__(self, hass: HomeAssistant, entry: ConfigEntry, input_num: int):
"""Initialize the sensor."""
self.hass = hass
self._entry = entry
self._input_num = input_num
self._attr_name = f"Input {input_num}"
self._attr_unique_id = f"{entry.entry_id}_input_{input_num}"
self._state = False
async def async_added_to_hass(self):
"""Subscribe to webhook dispatcher when entity is added."""
signal = f"{DOMAIN}_input_{self._input_num}_event"
self.async_on_remove(
async_dispatcher_connect(self.hass, signal, self._handle_webhook_event)
)
await super().async_added_to_hass()
@callback
def _handle_webhook_event(self, state):
"""Handle real-time input event received via webhook from the board."""
# Board already inverts pull-up logic before sending:
# state=True means pressed, state=False means released
self._state = state
_LOGGER.debug(
"Input %d webhook event: state=%s (sensor is_on=%s)",
self._input_num, state, self._state
)
self.async_write_ha_state()
@property
def is_on(self):
"""Return True if input is pressed."""
return self._state
@property
def device_info(self):
"""Return device information."""
try:
host = self._entry.data.get('host', 'unknown')
return {
"identifiers": {(DOMAIN, self._entry.entry_id)},
"name": f"Olimex ESP32-C6 ({host})",
"manufacturer": "Olimex",
"model": "ESP32-C6-EVB",
}
except Exception as err:
_LOGGER.debug("Error getting device info: %s", err)
return {
"identifiers": {(DOMAIN, self._entry.entry_id)},
"name": "Olimex ESP32-C6",
"manufacturer": "Olimex",
"model": "ESP32-C6-EVB",
}