fix: Only send JSON state over WebSocket (#27225)

This commit is contained in:
Koen Kanters
2025-05-03 08:42:07 +02:00
committed by GitHub
parent 908ecb3e2a
commit 39e4468308
9 changed files with 130 additions and 86 deletions
+11 -8
View File
@@ -1,5 +1,3 @@
import type {IClientPublishOptions} from "mqtt";
import type Extension from "./extension/extension";
import type {Zigbee2MQTTAPI} from "./types/api";
@@ -23,7 +21,7 @@ import ExtensionOnEvent from "./extension/onEvent";
import ExtensionOTAUpdate from "./extension/otaUpdate";
import ExtensionPublish from "./extension/publish";
import ExtensionReceive from "./extension/receive";
import Mqtt from "./mqtt";
import Mqtt, {type MqttPublishOptions} from "./mqtt";
import State from "./state";
import logger from "./util/logger";
import {initSdNotify} from "./util/sd-notify";
@@ -335,14 +333,19 @@ export class Controller {
message = newState;
}
const options: IClientPublishOptions = {
retain: utils.getObjectProperty(entity.options, "retain", false),
qos: utils.getObjectProperty(entity.options, "qos", 0),
const options: MakePartialExcept<MqttPublishOptions, "publishOptions" | "meta"> = {
publishOptions: {
retain: utils.getObjectProperty(entity.options, "retain", false),
qos: utils.getObjectProperty(entity.options, "qos", 0),
},
meta: {
isEntityState: true,
},
};
const retention = utils.getObjectProperty<number | false>(entity.options, "retention", false);
if (retention !== false) {
options.properties = {messageExpiryInterval: retention};
options.publishOptions.properties = {messageExpiryInterval: retention};
}
if (entity.isDevice() && settings.get().mqtt.include_device_information) {
@@ -399,7 +402,7 @@ export class Controller {
this.eventBus.emitPublishEntityState({entity, message, stateChangeReason, payload});
}
async iteratePayloadAttributeOutput(topicRoot: string, payload: KeyValue, options: IClientPublishOptions): Promise<void> {
async iteratePayloadAttributeOutput(topicRoot: string, payload: KeyValue, options: Partial<MqttPublishOptions>): Promise<void> {
for (const [key, value] of Object.entries(payload)) {
let subPayload = value;
let message = null;
+2 -2
View File
@@ -206,7 +206,7 @@ export default class Availability extends Extension {
this.eventBus.onEntityRenamed(this, async (data) => {
if (utils.isAvailabilityEnabledForEntity(data.entity, settings.get())) {
await this.mqtt.publish(`${data.from}/availability`, "", {retain: true, qos: 1});
await this.mqtt.publish(`${data.from}/availability`, "", {publishOptions: {retain: true, qos: 1}});
await this.publishAvailability(data.entity, false, true);
}
});
@@ -265,7 +265,7 @@ export default class Availability extends Extension {
const topic = `${entity.name}/availability`;
const payload: Zigbee2MQTTAPI["{friendlyName}/availability"] = {state: available ? "online" : "offline"};
this.lastPublishedAvailabilities.set(entity.ID, available);
await this.mqtt.publish(topic, JSON.stringify(payload), {retain: true, qos: 1});
await this.mqtt.publish(topic, JSON.stringify(payload), {publishOptions: {retain: true, qos: 1}});
if (!skipGroups && entity.isDevice()) {
for (const group of this.zigbee.groupsIterator()) {
+11 -12
View File
@@ -58,14 +58,13 @@ export default class Bridge extends Extension {
override async start(): Promise<void> {
const debugToMQTTFrontend = settings.get().advanced.log_debug_to_mqtt_frontend;
const baseTopic = settings.get().mqtt.base_topic;
const bridgeLogging = (message: string, level: string, namespace: string): void => {
const payload = stringify({message, level, namespace});
if (payload !== this.lastBridgeLoggingPayload) {
this.lastBridgeLoggingPayload = payload;
void this.mqtt.publish("bridge/logging", payload, {}, baseTopic, true);
void this.mqtt.publish("bridge/logging", payload, {skipLog: true});
}
};
@@ -129,7 +128,7 @@ export default class Bridge extends Extension {
data: {friendly_name: data.device.name, ieee_address: data.device.ieeeAddr},
};
await this.mqtt.publish("bridge/event", stringify(payload), {retain: false, qos: 0});
await this.mqtt.publish("bridge/event", stringify(payload), {publishOptions: {retain: false, qos: 0}});
});
this.eventBus.onDeviceLeave(this, async (data) => {
await this.publishDevices();
@@ -137,7 +136,7 @@ export default class Bridge extends Extension {
const payload: Zigbee2MQTTAPI["bridge/event"] = {type: "device_leave", data: {ieee_address: data.ieeeAddr, friendly_name: data.name}};
await this.mqtt.publish("bridge/event", stringify(payload), {retain: false, qos: 0});
await this.mqtt.publish("bridge/event", stringify(payload), {publishOptions: {retain: false, qos: 0}});
});
this.eventBus.onDeviceNetworkAddressChanged(this, async () => {
await this.publishDevices();
@@ -165,7 +164,7 @@ export default class Bridge extends Extension {
};
}
await this.mqtt.publish("bridge/event", stringify(payload), {retain: false, qos: 0});
await this.mqtt.publish("bridge/event", stringify(payload), {publishOptions: {retain: false, qos: 0}});
});
this.eventBus.onDeviceAnnounce(this, async (data) => {
await this.publishDevices();
@@ -175,7 +174,7 @@ export default class Bridge extends Extension {
data: {friendly_name: data.device.name, ieee_address: data.device.ieeeAddr},
};
await this.mqtt.publish("bridge/event", stringify(payload), {retain: false, qos: 0});
await this.mqtt.publish("bridge/event", stringify(payload), {publishOptions: {retain: false, qos: 0}});
});
await this.publishInfo();
@@ -573,7 +572,7 @@ export default class Bridge extends Extension {
settings.changeFriendlyName(from, to);
// Clear retained messages
await this.mqtt.publish(oldFriendlyName, "", {retain: true});
await this.mqtt.publish(oldFriendlyName, "", {publishOptions: {retain: true}});
this.eventBus.emitEntityRenamed({entity: entity, homeAssisantRename, from: oldFriendlyName, to});
@@ -642,7 +641,7 @@ export default class Bridge extends Extension {
this.state.remove(entity.ID);
// Clear any retained messages
await this.mqtt.publish(friendlyName, "", {retain: true});
await this.mqtt.publish(friendlyName, "", {publishOptions: {retain: true}});
logger.info(`Successfully removed ${entityType} '${friendlyName}'${blockForceLog}`);
@@ -712,7 +711,7 @@ export default class Bridge extends Extension {
config_schema: settings.schemaJson,
};
await this.mqtt.publish("bridge/info", stringify(payload), {retain: true, qos: 0}, settings.get().mqtt.base_topic, true);
await this.mqtt.publish("bridge/info", stringify(payload), {publishOptions: {retain: true, qos: 0}, skipLog: true});
}
async publishDevices(): Promise<void> {
@@ -774,7 +773,7 @@ export default class Bridge extends Extension {
});
}
await this.mqtt.publish("bridge/devices", stringify(devices), {retain: true, qos: 0}, settings.get().mqtt.base_topic, true);
await this.mqtt.publish("bridge/devices", stringify(devices), {publishOptions: {retain: true, qos: 0}, skipLog: true});
}
async publishGroups(): Promise<void> {
@@ -796,7 +795,7 @@ export default class Bridge extends Extension {
});
}
await this.mqtt.publish("bridge/groups", stringify(groups), {retain: true, qos: 0}, settings.get().mqtt.base_topic, true);
await this.mqtt.publish("bridge/groups", stringify(groups), {publishOptions: {retain: true, qos: 0}, skipLog: true});
}
async publishDefinitions(): Promise<void> {
@@ -809,7 +808,7 @@ export default class Bridge extends Extension {
data.custom_clusters[device.ieeeAddr] = device.customClusters;
}
await this.mqtt.publish("bridge/definitions", stringify(data), {retain: true, qos: 0}, settings.get().mqtt.base_topic, true);
await this.mqtt.publish("bridge/definitions", stringify(data), {publishOptions: {retain: true, qos: 0}, skipLog: true});
}
getDefinitionPayload(device: Device): Zigbee2MQTTDevice["definition"] | undefined {
+4 -10
View File
@@ -225,16 +225,10 @@ export default abstract class ExternalJSExtension<M> extends Extension {
}
private async publishExternalJS(): Promise<void> {
await this.mqtt.publish(
`bridge/${this.mqttTopic}s`,
stringify(Array.from(this.getFiles(true))),
{
retain: true,
qos: 0,
},
settings.get().mqtt.base_topic,
true,
);
await this.mqtt.publish(`bridge/${this.mqttTopic}s`, stringify(Array.from(this.getFiles(true))), {
publishOptions: {retain: true, qos: 0},
skipLog: true,
});
}
private getImportPath(filePath: string): string {
+26 -10
View File
@@ -105,7 +105,8 @@ export class Frontend extends Extension {
}
this.server.on("upgrade", this.onUpgrade);
this.eventBus.onMQTTMessagePublished(this, this.onMQTTPublishMessage);
this.eventBus.onMQTTMessagePublished(this, this.onMQTTPublishMessageOrEntityState);
this.eventBus.onPublishEntityState(this, this.onMQTTPublishMessageOrEntityState);
if (!this.host) {
this.server.listen(this.port);
@@ -217,16 +218,31 @@ export class Frontend extends Extension {
}
}
@bind private onMQTTPublishMessage(data: eventdata.MQTTMessagePublished): void {
if (data.topic.startsWith(`${this.mqttBaseTopic}/`)) {
// Send topic without base_topic
const topic = data.topic.substring(this.mqttBaseTopic.length + 1);
const payload = utils.parseJSON(data.payload, data.payload);
@bind private onMQTTPublishMessageOrEntityState(data: eventdata.MQTTMessagePublished | eventdata.PublishEntityState): void {
let topic: string;
let payload: KeyValue | string;
for (const client of this.wss.clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(stringify({topic, payload}));
}
if ("topic" in data) {
// MQTTMessagePublished
if (data.options.meta.isEntityState || !data.topic.startsWith(`${this.mqttBaseTopic}/`)) {
// Don't send entity state to frontend on `MQTTMessagePublished` event, this is handled by
// `PublishEntityState` instead. Reason for this is to skip attribute messages when `output` is
// set to `attribute` or `attribute_and_json`, we only want to send JSON entity states to the
// frontend.
return;
}
// Send topic without base_topic
topic = data.topic.substring(this.mqttBaseTopic.length + 1);
payload = utils.parseJSON(data.payload, data.payload);
} else {
// PublishEntityState
topic = data.entity.name;
payload = data.payload;
}
for (const client of this.wss.clients) {
if (client.readyState === WebSocket.OPEN) {
client.send(stringify({topic, payload}));
}
}
}
+15 -7
View File
@@ -1221,7 +1221,7 @@ export class HomeAssistant extends Extension {
const discovered = this.getDiscovered(data.id);
for (const topic of Object.keys(discovered.messages)) {
await this.mqtt.publish(topic, "", {retain: true, qos: 1}, this.discoveryTopic, false, false);
await this.mqtt.publish(topic, "", {publishOptions: {retain: true, qos: 1}, baseTopic: this.discoveryTopic, skipReceive: false});
}
delete this.discovered[data.id];
@@ -1305,7 +1305,7 @@ export class HomeAssistant extends Extension {
if (data.homeAssisantRename) {
const discovered = this.getDiscovered(data.entity);
for (const topic of Object.keys(discovered.messages)) {
await this.mqtt.publish(topic, "", {retain: true, qos: 1}, this.discoveryTopic, false, false);
await this.mqtt.publish(topic, "", {publishOptions: {retain: true, qos: 1}, baseTopic: this.discoveryTopic, skipReceive: false});
}
discovered.messages = {};
@@ -1685,7 +1685,11 @@ export class HomeAssistant extends Extension {
if (!discoveredMessage || discoveredMessage.payload !== payloadStr || !discoveredMessage.published) {
discovered.messages[topic] = {payload: payloadStr, published: publish};
if (publish) {
await this.mqtt.publish(topic, payloadStr, {retain: true, qos: 1}, this.discoveryTopic, false, false);
await this.mqtt.publish(topic, payloadStr, {
publishOptions: {retain: true, qos: 1},
baseTopic: this.discoveryTopic,
skipReceive: false,
});
}
} else {
logger.debug(`Skipping discovery of '${topic}', already discovered`);
@@ -1701,7 +1705,7 @@ export class HomeAssistant extends Extension {
for (const topic of lastDiscoveredTopics) {
const isDeviceAutomation = topic.match(this.discoveryRegexWoTopic)?.[1] === "device_automation";
if (!newDiscoveredTopics.has(topic) && !isDeviceAutomation) {
await this.mqtt.publish(topic, "", {retain: true, qos: 1}, this.discoveryTopic, false, false);
await this.mqtt.publish(topic, "", {publishOptions: {retain: true, qos: 1}, baseTopic: this.discoveryTopic, skipReceive: false});
}
}
}
@@ -1751,7 +1755,7 @@ export class HomeAssistant extends Extension {
if (clear) {
logger.debug(`Clearing outdated Home Assistant config '${data.topic}'`);
await this.mqtt.publish(topic, "", {retain: true, qos: 1}, this.discoveryTopic, false, false);
await this.mqtt.publish(topic, "", {publishOptions: {retain: true, qos: 1}, baseTopic: this.discoveryTopic, skipReceive: false});
} else if (entity) {
this.getDiscovered(entity).messages[topic] = {payload: stringify(message), published: true};
}
@@ -1784,7 +1788,7 @@ export class HomeAssistant extends Extension {
for (const topic of Object.keys(discovered.messages)) {
if (topic.startsWith("scene")) {
await this.mqtt.publish(topic, "", {retain: true, qos: 1}, this.discoveryTopic, false, false);
await this.mqtt.publish(topic, "", {publishOptions: {retain: true, qos: 1}, baseTopic: this.discoveryTopic, skipReceive: false});
delete discovered.messages[topic];
}
}
@@ -1917,7 +1921,11 @@ export class HomeAssistant extends Extension {
origin: this.discoveryOrigin,
};
await this.mqtt.publish(topic, stringify(payload), {retain: true, qos: 1}, this.discoveryTopic, false, false);
await this.mqtt.publish(topic, stringify(payload), {
publishOptions: {retain: true, qos: 1},
baseTopic: this.discoveryTopic,
skipReceive: false,
});
discovered.triggers.add(discoveredKey);
}
+34 -26
View File
@@ -12,6 +12,15 @@ import * as settings from "./util/settings";
import utils from "./util/utils";
const NS = "z2m:mqtt";
const DEFAULT_CLIENT_PUBLISH_OPTIONS: IClientPublishOptions = {qos: 0 as const, retain: false};
export interface MqttPublishOptions {
publishOptions: IClientPublishOptions;
baseTopic: string;
skipLog: boolean;
skipReceive: boolean;
meta: {isEntityState?: boolean};
}
export default class Mqtt {
private publishedTopics = new Set<string>();
@@ -19,12 +28,18 @@ export default class Mqtt {
private client!: MqttClient;
private eventBus: EventBus;
private republishRetainedTimer?: NodeJS.Timeout;
public retainedMessages: {
[s: string]: {payload: string; options: IClientPublishOptions; skipLog: boolean; skipReceive: boolean; topic: string; base: string};
} = {};
private defaultPublishOptions: MqttPublishOptions;
public retainedMessages: {[s: string]: {topic: string; payload: string; options: MqttPublishOptions}} = {};
constructor(eventBus: EventBus) {
this.eventBus = eventBus;
this.defaultPublishOptions = {
publishOptions: {},
baseTopic: settings.get().mqtt.base_topic,
skipLog: false,
skipReceive: true,
meta: {},
};
}
async connect(): Promise<void> {
@@ -109,7 +124,7 @@ export default class Mqtt {
// Republish retained messages in case MQTT broker does not persist them.
// https://github.com/Koenkk/zigbee2mqtt/issues/9629
for (const msg of Object.values(this.retainedMessages)) {
await this.publish(msg.topic, msg.payload, msg.options, msg.base, msg.skipLog, msg.skipReceive);
await this.publish(msg.topic, msg.payload, msg.options);
}
}, 2000);
@@ -127,7 +142,7 @@ export default class Mqtt {
const stateData: Zigbee2MQTTAPI["bridge/state"] = {state: "offline"};
await this.publish("bridge/state", JSON.stringify(stateData), {retain: true, qos: 0});
await this.publish("bridge/state", JSON.stringify(stateData), {publishOptions: {retain: true, qos: 0}});
this.eventBus.removeListeners(this);
logger.info("Disconnecting from MQTT server");
await this.client?.endAsync();
@@ -146,7 +161,7 @@ export default class Mqtt {
const stateData: Zigbee2MQTTAPI["bridge/state"] = {state: "online"};
await this.publish("bridge/state", JSON.stringify(stateData), {retain: true, qos: 0});
await this.publish("bridge/state", JSON.stringify(stateData), {publishOptions: {retain: true, qos: 0}});
await this.subscribe(`${settings.get().mqtt.base_topic}/#`);
}
@@ -168,39 +183,32 @@ export default class Mqtt {
return this.client && !this.client.reconnecting && !this.client.disconnecting && !this.client.disconnected;
}
async publish(
topic: string,
payload: string,
options: IClientPublishOptions = {},
base = settings.get().mqtt.base_topic,
skipLog = false,
skipReceive = true,
): Promise<void> {
async publish(topic: string, payload: string, options: Partial<MqttPublishOptions> = {}): Promise<void> {
if (topic.includes("+") || topic.includes("#")) {
// https://github.com/Koenkk/zigbee2mqtt/issues/26939#issuecomment-2772309646
logger.error(`Topic '${topic}' includes wildcard characters, skipping publish.`);
return;
}
const defaultOptions = {qos: 0 as const, retain: false};
topic = `${base}/${topic}`;
const finalOptions = {...this.defaultPublishOptions, ...options};
topic = `${finalOptions.baseTopic}/${topic}`;
if (skipReceive) {
if (finalOptions.skipReceive) {
this.publishedTopics.add(topic);
}
if (options.retain) {
if (finalOptions.publishOptions.retain) {
if (payload) {
this.retainedMessages[topic] = {payload, options, skipReceive, skipLog, topic: topic.substring(base.length + 1), base};
this.retainedMessages[topic] = {payload, options: finalOptions, topic: topic.substring(finalOptions.baseTopic.length + 1)};
} else {
delete this.retainedMessages[topic];
}
}
this.eventBus.emitMQTTMessagePublished({topic, payload, options: {...defaultOptions, ...options}});
this.eventBus.emitMQTTMessagePublished({topic, payload, options: finalOptions});
if (!this.isConnected()) {
if (!skipLog) {
if (!finalOptions.skipLog) {
logger.error("Not connected to MQTT server!");
logger.error(`Cannot send message: topic: '${topic}', payload: '${payload}`);
}
@@ -208,20 +216,20 @@ export default class Mqtt {
return;
}
if (!skipLog) {
if (!finalOptions.skipLog) {
logger.info(() => `MQTT publish: topic '${topic}', payload '${payload}'`, NS);
}
const actualOptions: IClientPublishOptions = {...defaultOptions, ...options};
const publishOptions: IClientPublishOptions = {...DEFAULT_CLIENT_PUBLISH_OPTIONS, ...finalOptions.publishOptions};
if (settings.get().mqtt.force_disable_retain) {
actualOptions.retain = false;
publishOptions.retain = false;
}
try {
await this.client.publishAsync(topic, payload, actualOptions);
await this.client.publishAsync(topic, payload, publishOptions);
} catch (error) {
if (!skipLog) {
if (!finalOptions.skipLog) {
logger.error(`MQTT server error: ${(error as Error).message}`);
logger.error(`Could not send message: topic: '${topic}', payload: '${payload}`);
}
+3 -1
View File
@@ -6,6 +6,7 @@ import type TypeExtension from "../extension/extension";
import type TypeDevice from "../model/device";
import type TypeGroup from "../model/group";
import type TypeMqtt from "../mqtt";
import type {MqttPublishOptions} from "../mqtt";
import type TypeState from "../state";
import type TypeZigbee from "../zigbee";
import type {Zigbee2MQTTDeviceOptions, Zigbee2MQTTGroupOptions, Zigbee2MQTTSettings} from "./api";
@@ -24,6 +25,7 @@ declare global {
type StateChangeReason = "publishDebounce" | "groupOptimistic" | "lastSeenChanged" | "publishCached" | "publishThrottle";
type PublishEntityState = (entity: Device | Group, payload: KeyValue, stateChangeReason?: StateChangeReason) => Promise<void>;
type RecursivePartial<T> = {[P in keyof T]?: RecursivePartial<T[P]>};
type MakePartialExcept<T, K extends keyof T> = Partial<Omit<T, K>> & Pick<T, K>;
interface KeyValue {
// biome-ignore lint/suspicious/noExplicitAny: API
[s: string]: any;
@@ -49,7 +51,7 @@ declare global {
type EntityRenamed = {entity: Device | Group; homeAssisantRename: boolean; from: string; to: string};
type EntityRemoved = {id: string; name: string; type: "device"} | {id: number; name: string; type: "group"};
type MQTTMessage = {topic: string; message: string};
type MQTTMessagePublished = {topic: string; payload: string; options: {retain: boolean; qos: number}};
type MQTTMessagePublished = {topic: string; payload: string; options: MqttPublishOptions};
type StateChange = {
entity: Device | Group;
from: KeyValue;
+24 -10
View File
@@ -1,8 +1,8 @@
import * as data from "../mocks/data";
import {mockLogger} from "../mocks/logger";
import {mockMQTTPublishAsync} from "../mocks/mqtt";
import {events as mockMQTTEvents, mockMQTTPublishAsync} from "../mocks/mqtt";
import {type EventHandler, flushPromises} from "../mocks/utils";
import {devices} from "../mocks/zigbeeHerdsman";
import {devices, events as mockZHEvents} from "../mocks/zigbeeHerdsman";
import path from "node:path";
@@ -224,7 +224,7 @@ describe("Extension: Frontend", () => {
await controller.stop();
});
it("Websocket interaction", async () => {
it("onlythis Websocket interaction", async () => {
controller = new Controller(vi.fn(), vi.fn());
await controller.start();
mockWSClient.readyState = "open";
@@ -268,13 +268,27 @@ describe("Extension: Frontend", () => {
expect(mockWSClient.send).toHaveBeenCalledWith(
stringify({
topic: "bulb_color",
payload: {
state: "ON",
effect: null,
power_on_behavior: null,
linkquality: null,
update: {state: null, installed_version: -1, latest_version: -1},
},
payload: {state: "ON"},
}),
);
// Should publish bridge messages
await mockZHEvents.deviceJoined({device: devices.bulb});
await flushPromises();
expect(mockWSClient.send).toHaveBeenCalledWith(
stringify({payload: {data: {friendly_name: "bulb", ieee_address: "0x000b57fffec6a5b2"}, type: "device_joined"}, topic: "bridge/event"}),
);
// Should send JSON state event when `output: attribute`
mockWSClient.send.mockClear();
settings.set(["advanced", "output"], "attribute");
await mockMQTTEvents.message("zigbee2mqtt/bulb_color/set", stringify({brightness: 90}));
await flushPromises();
expect(mockWSClient.send).toHaveBeenCalledTimes(1);
expect(mockWSClient.send).toHaveBeenCalledWith(
stringify({
topic: "bulb_color",
payload: {state: "ON", brightness: 90},
}),
);