mirror of
https://github.com/Koenkk/zigbee2mqtt.git
synced 2026-06-22 13:11:43 +00:00
191 lines
8.4 KiB
TypeScript
Executable File
191 lines
8.4 KiB
TypeScript
Executable File
import assert from "node:assert";
|
|
|
|
import bind from "bind-decorator";
|
|
import debounce from "debounce";
|
|
import stringify from "json-stable-stringify-without-jsonify";
|
|
import throttle from "throttleit";
|
|
|
|
import * as zhc from "zigbee-herdsman-converters";
|
|
|
|
import logger from "../util/logger";
|
|
import * as settings from "../util/settings";
|
|
import utils from "../util/utils";
|
|
import Extension from "./extension";
|
|
|
|
type DebounceFunction = (() => void) & {clear(): void} & {flush(): void};
|
|
|
|
export default class Receive extends Extension {
|
|
// TODO: move all to `Map`
|
|
private elapsed: {[s: string]: number} = {};
|
|
private debouncers: {[s: string]: {payload: KeyValue; publish: DebounceFunction}} = {};
|
|
private throttlers: {[s: string]: {publish: PublishEntityState}} = {};
|
|
|
|
// biome-ignore lint/suspicious/useAwait: API
|
|
override async start(): Promise<void> {
|
|
this.eventBus.onPublishEntityState(this, this.onPublishEntityState);
|
|
this.eventBus.onDeviceMessage(this, this.onDeviceMessage);
|
|
}
|
|
|
|
@bind onPublishEntityState(data: eventdata.PublishEntityState): void {
|
|
/**
|
|
* Prevent that outdated properties are being published.
|
|
* In case that e.g. the state is currently held back by a debounce and a new state is published
|
|
* remove it from the to be send debounced message.
|
|
*/
|
|
if (
|
|
data.entity.isDevice() &&
|
|
this.debouncers[data.entity.ieeeAddr] &&
|
|
data.stateChangeReason !== "publishDebounce" &&
|
|
data.stateChangeReason !== "lastSeenChanged"
|
|
) {
|
|
for (const key of Object.keys(data.payload)) {
|
|
delete this.debouncers[data.entity.ieeeAddr].payload[key];
|
|
}
|
|
}
|
|
}
|
|
|
|
publishDebounce(device: Device, payload: KeyValue, time: number, debounceIgnore: string[] | undefined): void {
|
|
if (!this.debouncers[device.ieeeAddr]) {
|
|
this.debouncers[device.ieeeAddr] = {
|
|
payload: {},
|
|
publish: debounce(async () => {
|
|
await this.publishEntityState(device, this.debouncers[device.ieeeAddr].payload, "publishDebounce");
|
|
this.debouncers[device.ieeeAddr].payload = {};
|
|
}, time * 1000),
|
|
};
|
|
}
|
|
|
|
if (this.isPayloadConflicted(payload, this.debouncers[device.ieeeAddr].payload, debounceIgnore)) {
|
|
// publish previous payload immediately
|
|
this.debouncers[device.ieeeAddr].publish.flush();
|
|
}
|
|
|
|
// extend debounced payload with current
|
|
this.debouncers[device.ieeeAddr].payload = {...this.debouncers[device.ieeeAddr].payload, ...payload};
|
|
|
|
// Update state cache right away. This makes sure that during debouncing cached state is always up to date.
|
|
// ( Update right away as "lastSeenChanged" event might occur while debouncer is still active.
|
|
// And if that happens it would cause old message to be published from cache.
|
|
// By updating cache we make sure that state cache is always up-to-date.
|
|
this.state.set(device, this.debouncers[device.ieeeAddr].payload);
|
|
|
|
this.debouncers[device.ieeeAddr].publish();
|
|
}
|
|
|
|
async publishThrottle(device: Device, payload: KeyValue, time: number): Promise<void> {
|
|
if (!this.throttlers[device.ieeeAddr]) {
|
|
this.throttlers[device.ieeeAddr] = {
|
|
publish: throttle(this.publishEntityState, time * 1000),
|
|
};
|
|
}
|
|
|
|
// Update state cache right away. This makes sure that during throttling cached state is always up to date.
|
|
// By updating cache we make sure that state cache is always up-to-date.
|
|
this.state.set(device, payload);
|
|
|
|
await this.throttlers[device.ieeeAddr].publish(device, payload, "publishThrottle");
|
|
}
|
|
|
|
// if debounce_ignore are specified (Array of strings)
|
|
// then all newPayload values with key present in debounce_ignore
|
|
// should equal or be undefined in oldPayload
|
|
// otherwise payload is conflicted
|
|
isPayloadConflicted(newPayload: KeyValue, oldPayload: KeyValue, debounceIgnore: string[] | undefined): boolean {
|
|
let result = false;
|
|
|
|
for (const key in oldPayload) {
|
|
if (debounceIgnore?.includes(key) && typeof newPayload[key] !== "undefined" && newPayload[key] !== oldPayload[key]) {
|
|
result = true;
|
|
}
|
|
}
|
|
|
|
return result;
|
|
}
|
|
|
|
@bind async onDeviceMessage(data: eventdata.DeviceMessage): Promise<void> {
|
|
/* v8 ignore next */
|
|
if (!data.device) return;
|
|
|
|
if (!data.device.definition || !data.device.interviewed) {
|
|
logger.debug("Skipping message, still interviewing");
|
|
await utils.publishLastSeen({device: data.device, reason: "messageEmitted"}, settings.get(), true, this.publishEntityState);
|
|
return;
|
|
}
|
|
|
|
const converters = data.device.definition.fromZigbee.filter((c) => {
|
|
const type = Array.isArray(c.type) ? c.type.includes(data.type) : c.type === data.type;
|
|
return c.cluster === data.cluster && type;
|
|
});
|
|
|
|
// Check if there is an available converter, genOta messages are not interesting.
|
|
const ignoreClusters: (string | number)[] = ["genOta", "genTime", "genBasic", "genPollCtrl"];
|
|
if (converters.length === 0 && !ignoreClusters.includes(data.cluster)) {
|
|
logger.debug(
|
|
`No converter available for '${data.device.definition.model}' with ` +
|
|
`cluster '${data.cluster}' and type '${data.type}' and data '${stringify(data.data)}'`,
|
|
);
|
|
await utils.publishLastSeen({device: data.device, reason: "messageEmitted"}, settings.get(), true, this.publishEntityState);
|
|
return;
|
|
}
|
|
|
|
// Convert this Zigbee message to a MQTT message.
|
|
// Get payload for the message.
|
|
// - If a payload is returned publish it to the MQTT broker
|
|
// - If NO payload is returned do nothing. This is for non-standard behaviour
|
|
// for e.g. click switches where we need to count number of clicks and detect long presses.
|
|
const publish = async (payload: KeyValue): Promise<void> => {
|
|
assert(data.device.definition);
|
|
const options: KeyValue = data.device.options;
|
|
zhc.postProcessConvertedFromZigbeeMessage(data.device.definition, payload, options, data.device.zh);
|
|
|
|
if (settings.get().advanced.elapsed) {
|
|
const now = Date.now();
|
|
if (this.elapsed[data.device.ieeeAddr]) {
|
|
payload.elapsed = now - this.elapsed[data.device.ieeeAddr];
|
|
}
|
|
|
|
this.elapsed[data.device.ieeeAddr] = now;
|
|
}
|
|
|
|
// Check if we have to debounce or throttle
|
|
if (data.device.options.debounce) {
|
|
this.publishDebounce(data.device, payload, data.device.options.debounce, data.device.options.debounce_ignore);
|
|
} else if (data.device.options.throttle) {
|
|
await this.publishThrottle(data.device, payload, data.device.options.throttle);
|
|
} else {
|
|
await this.publishEntityState(data.device, payload);
|
|
}
|
|
};
|
|
|
|
const meta = {
|
|
device: data.device.zh,
|
|
logger,
|
|
state: this.state.get(data.device),
|
|
deviceExposesChanged: (): void => this.eventBus.emitExposesAndDevicesChanged(data.device),
|
|
};
|
|
let payload: KeyValue = {};
|
|
for (const converter of converters) {
|
|
try {
|
|
const convertData = {...data, device: data.device.zh};
|
|
const options: KeyValue = data.device.options;
|
|
const converted = await converter.convert(data.device.definition, convertData, publish, options, meta);
|
|
if (converted) {
|
|
payload = {...payload, ...converted};
|
|
}
|
|
/* v8 ignore start */
|
|
} catch (error) {
|
|
logger.error(`Exception while calling fromZigbee converter: ${(error as Error).message}}`);
|
|
// biome-ignore lint/style/noNonNullAssertion: always Error
|
|
logger.debug((error as Error).stack!);
|
|
}
|
|
/* v8 ignore stop */
|
|
}
|
|
|
|
if (!utils.objectIsEmpty(payload)) {
|
|
await publish(payload);
|
|
} else {
|
|
await utils.publishLastSeen({device: data.device, reason: "messageEmitted"}, settings.get(), true, this.publishEntityState);
|
|
}
|
|
}
|
|
}
|