diff --git a/lib/controller.ts b/lib/controller.ts index 902f958c..84cd6dc4 100644 --- a/lib/controller.ts +++ b/lib/controller.ts @@ -1,5 +1,3 @@ -import type {IClientPublishOptions} from "mqtt"; - import type Extension from "./extension/extension"; import type {Zigbee2MQTTAPI} from "./types/api"; @@ -23,7 +21,7 @@ import ExtensionOnEvent from "./extension/onEvent"; import ExtensionOTAUpdate from "./extension/otaUpdate"; import ExtensionPublish from "./extension/publish"; import ExtensionReceive from "./extension/receive"; -import Mqtt from "./mqtt"; +import Mqtt, {type MqttPublishOptions} from "./mqtt"; import State from "./state"; import logger from "./util/logger"; import {initSdNotify} from "./util/sd-notify"; @@ -335,14 +333,19 @@ export class Controller { message = newState; } - const options: IClientPublishOptions = { - retain: utils.getObjectProperty(entity.options, "retain", false), - qos: utils.getObjectProperty(entity.options, "qos", 0), + const options: MakePartialExcept = { + publishOptions: { + retain: utils.getObjectProperty(entity.options, "retain", false), + qos: utils.getObjectProperty(entity.options, "qos", 0), + }, + meta: { + isEntityState: true, + }, }; const retention = utils.getObjectProperty(entity.options, "retention", false); if (retention !== false) { - options.properties = {messageExpiryInterval: retention}; + options.publishOptions.properties = {messageExpiryInterval: retention}; } if (entity.isDevice() && settings.get().mqtt.include_device_information) { @@ -399,7 +402,7 @@ export class Controller { this.eventBus.emitPublishEntityState({entity, message, stateChangeReason, payload}); } - async iteratePayloadAttributeOutput(topicRoot: string, payload: KeyValue, options: IClientPublishOptions): Promise { + async iteratePayloadAttributeOutput(topicRoot: string, payload: KeyValue, options: Partial): Promise { for (const [key, value] of Object.entries(payload)) { let subPayload = value; let message = null; diff --git a/lib/extension/availability.ts b/lib/extension/availability.ts index eabf22e0..69a803de 100644 --- a/lib/extension/availability.ts +++ b/lib/extension/availability.ts @@ -206,7 +206,7 @@ export default class Availability extends Extension { this.eventBus.onEntityRenamed(this, async (data) => { if (utils.isAvailabilityEnabledForEntity(data.entity, settings.get())) { - await this.mqtt.publish(`${data.from}/availability`, "", {retain: true, qos: 1}); + await this.mqtt.publish(`${data.from}/availability`, "", {publishOptions: {retain: true, qos: 1}}); await this.publishAvailability(data.entity, false, true); } }); @@ -265,7 +265,7 @@ export default class Availability extends Extension { const topic = `${entity.name}/availability`; const payload: Zigbee2MQTTAPI["{friendlyName}/availability"] = {state: available ? "online" : "offline"}; this.lastPublishedAvailabilities.set(entity.ID, available); - await this.mqtt.publish(topic, JSON.stringify(payload), {retain: true, qos: 1}); + await this.mqtt.publish(topic, JSON.stringify(payload), {publishOptions: {retain: true, qos: 1}}); if (!skipGroups && entity.isDevice()) { for (const group of this.zigbee.groupsIterator()) { diff --git a/lib/extension/bridge.ts b/lib/extension/bridge.ts index 455c4f6e..68043710 100644 --- a/lib/extension/bridge.ts +++ b/lib/extension/bridge.ts @@ -58,14 +58,13 @@ export default class Bridge extends Extension { override async start(): Promise { const debugToMQTTFrontend = settings.get().advanced.log_debug_to_mqtt_frontend; - const baseTopic = settings.get().mqtt.base_topic; const bridgeLogging = (message: string, level: string, namespace: string): void => { const payload = stringify({message, level, namespace}); if (payload !== this.lastBridgeLoggingPayload) { this.lastBridgeLoggingPayload = payload; - void this.mqtt.publish("bridge/logging", payload, {}, baseTopic, true); + void this.mqtt.publish("bridge/logging", payload, {skipLog: true}); } }; @@ -129,7 +128,7 @@ export default class Bridge extends Extension { data: {friendly_name: data.device.name, ieee_address: data.device.ieeeAddr}, }; - await this.mqtt.publish("bridge/event", stringify(payload), {retain: false, qos: 0}); + await this.mqtt.publish("bridge/event", stringify(payload), {publishOptions: {retain: false, qos: 0}}); }); this.eventBus.onDeviceLeave(this, async (data) => { await this.publishDevices(); @@ -137,7 +136,7 @@ export default class Bridge extends Extension { const payload: Zigbee2MQTTAPI["bridge/event"] = {type: "device_leave", data: {ieee_address: data.ieeeAddr, friendly_name: data.name}}; - await this.mqtt.publish("bridge/event", stringify(payload), {retain: false, qos: 0}); + await this.mqtt.publish("bridge/event", stringify(payload), {publishOptions: {retain: false, qos: 0}}); }); this.eventBus.onDeviceNetworkAddressChanged(this, async () => { await this.publishDevices(); @@ -165,7 +164,7 @@ export default class Bridge extends Extension { }; } - await this.mqtt.publish("bridge/event", stringify(payload), {retain: false, qos: 0}); + await this.mqtt.publish("bridge/event", stringify(payload), {publishOptions: {retain: false, qos: 0}}); }); this.eventBus.onDeviceAnnounce(this, async (data) => { await this.publishDevices(); @@ -175,7 +174,7 @@ export default class Bridge extends Extension { data: {friendly_name: data.device.name, ieee_address: data.device.ieeeAddr}, }; - await this.mqtt.publish("bridge/event", stringify(payload), {retain: false, qos: 0}); + await this.mqtt.publish("bridge/event", stringify(payload), {publishOptions: {retain: false, qos: 0}}); }); await this.publishInfo(); @@ -573,7 +572,7 @@ export default class Bridge extends Extension { settings.changeFriendlyName(from, to); // Clear retained messages - await this.mqtt.publish(oldFriendlyName, "", {retain: true}); + await this.mqtt.publish(oldFriendlyName, "", {publishOptions: {retain: true}}); this.eventBus.emitEntityRenamed({entity: entity, homeAssisantRename, from: oldFriendlyName, to}); @@ -642,7 +641,7 @@ export default class Bridge extends Extension { this.state.remove(entity.ID); // Clear any retained messages - await this.mqtt.publish(friendlyName, "", {retain: true}); + await this.mqtt.publish(friendlyName, "", {publishOptions: {retain: true}}); logger.info(`Successfully removed ${entityType} '${friendlyName}'${blockForceLog}`); @@ -712,7 +711,7 @@ export default class Bridge extends Extension { config_schema: settings.schemaJson, }; - await this.mqtt.publish("bridge/info", stringify(payload), {retain: true, qos: 0}, settings.get().mqtt.base_topic, true); + await this.mqtt.publish("bridge/info", stringify(payload), {publishOptions: {retain: true, qos: 0}, skipLog: true}); } async publishDevices(): Promise { @@ -774,7 +773,7 @@ export default class Bridge extends Extension { }); } - await this.mqtt.publish("bridge/devices", stringify(devices), {retain: true, qos: 0}, settings.get().mqtt.base_topic, true); + await this.mqtt.publish("bridge/devices", stringify(devices), {publishOptions: {retain: true, qos: 0}, skipLog: true}); } async publishGroups(): Promise { @@ -796,7 +795,7 @@ export default class Bridge extends Extension { }); } - await this.mqtt.publish("bridge/groups", stringify(groups), {retain: true, qos: 0}, settings.get().mqtt.base_topic, true); + await this.mqtt.publish("bridge/groups", stringify(groups), {publishOptions: {retain: true, qos: 0}, skipLog: true}); } async publishDefinitions(): Promise { @@ -809,7 +808,7 @@ export default class Bridge extends Extension { data.custom_clusters[device.ieeeAddr] = device.customClusters; } - await this.mqtt.publish("bridge/definitions", stringify(data), {retain: true, qos: 0}, settings.get().mqtt.base_topic, true); + await this.mqtt.publish("bridge/definitions", stringify(data), {publishOptions: {retain: true, qos: 0}, skipLog: true}); } getDefinitionPayload(device: Device): Zigbee2MQTTDevice["definition"] | undefined { diff --git a/lib/extension/externalJS.ts b/lib/extension/externalJS.ts index e290a544..9777dc52 100644 --- a/lib/extension/externalJS.ts +++ b/lib/extension/externalJS.ts @@ -225,16 +225,10 @@ export default abstract class ExternalJSExtension extends Extension { } private async publishExternalJS(): Promise { - await this.mqtt.publish( - `bridge/${this.mqttTopic}s`, - stringify(Array.from(this.getFiles(true))), - { - retain: true, - qos: 0, - }, - settings.get().mqtt.base_topic, - true, - ); + await this.mqtt.publish(`bridge/${this.mqttTopic}s`, stringify(Array.from(this.getFiles(true))), { + publishOptions: {retain: true, qos: 0}, + skipLog: true, + }); } private getImportPath(filePath: string): string { diff --git a/lib/extension/frontend.ts b/lib/extension/frontend.ts index 35a1ee4c..0a6fcf87 100644 --- a/lib/extension/frontend.ts +++ b/lib/extension/frontend.ts @@ -105,7 +105,8 @@ export class Frontend extends Extension { } this.server.on("upgrade", this.onUpgrade); - this.eventBus.onMQTTMessagePublished(this, this.onMQTTPublishMessage); + this.eventBus.onMQTTMessagePublished(this, this.onMQTTPublishMessageOrEntityState); + this.eventBus.onPublishEntityState(this, this.onMQTTPublishMessageOrEntityState); if (!this.host) { this.server.listen(this.port); @@ -217,16 +218,31 @@ export class Frontend extends Extension { } } - @bind private onMQTTPublishMessage(data: eventdata.MQTTMessagePublished): void { - if (data.topic.startsWith(`${this.mqttBaseTopic}/`)) { - // Send topic without base_topic - const topic = data.topic.substring(this.mqttBaseTopic.length + 1); - const payload = utils.parseJSON(data.payload, data.payload); + @bind private onMQTTPublishMessageOrEntityState(data: eventdata.MQTTMessagePublished | eventdata.PublishEntityState): void { + let topic: string; + let payload: KeyValue | string; - for (const client of this.wss.clients) { - if (client.readyState === WebSocket.OPEN) { - client.send(stringify({topic, payload})); - } + if ("topic" in data) { + // MQTTMessagePublished + if (data.options.meta.isEntityState || !data.topic.startsWith(`${this.mqttBaseTopic}/`)) { + // Don't send entity state to frontend on `MQTTMessagePublished` event, this is handled by + // `PublishEntityState` instead. Reason for this is to skip attribute messages when `output` is + // set to `attribute` or `attribute_and_json`, we only want to send JSON entity states to the + // frontend. + return; + } + // Send topic without base_topic + topic = data.topic.substring(this.mqttBaseTopic.length + 1); + payload = utils.parseJSON(data.payload, data.payload); + } else { + // PublishEntityState + topic = data.entity.name; + payload = data.payload; + } + + for (const client of this.wss.clients) { + if (client.readyState === WebSocket.OPEN) { + client.send(stringify({topic, payload})); } } } diff --git a/lib/extension/homeassistant.ts b/lib/extension/homeassistant.ts index 1609eb61..da3c1b83 100644 --- a/lib/extension/homeassistant.ts +++ b/lib/extension/homeassistant.ts @@ -1221,7 +1221,7 @@ export class HomeAssistant extends Extension { const discovered = this.getDiscovered(data.id); for (const topic of Object.keys(discovered.messages)) { - await this.mqtt.publish(topic, "", {retain: true, qos: 1}, this.discoveryTopic, false, false); + await this.mqtt.publish(topic, "", {publishOptions: {retain: true, qos: 1}, baseTopic: this.discoveryTopic, skipReceive: false}); } delete this.discovered[data.id]; @@ -1305,7 +1305,7 @@ export class HomeAssistant extends Extension { if (data.homeAssisantRename) { const discovered = this.getDiscovered(data.entity); for (const topic of Object.keys(discovered.messages)) { - await this.mqtt.publish(topic, "", {retain: true, qos: 1}, this.discoveryTopic, false, false); + await this.mqtt.publish(topic, "", {publishOptions: {retain: true, qos: 1}, baseTopic: this.discoveryTopic, skipReceive: false}); } discovered.messages = {}; @@ -1685,7 +1685,11 @@ export class HomeAssistant extends Extension { if (!discoveredMessage || discoveredMessage.payload !== payloadStr || !discoveredMessage.published) { discovered.messages[topic] = {payload: payloadStr, published: publish}; if (publish) { - await this.mqtt.publish(topic, payloadStr, {retain: true, qos: 1}, this.discoveryTopic, false, false); + await this.mqtt.publish(topic, payloadStr, { + publishOptions: {retain: true, qos: 1}, + baseTopic: this.discoveryTopic, + skipReceive: false, + }); } } else { logger.debug(`Skipping discovery of '${topic}', already discovered`); @@ -1701,7 +1705,7 @@ export class HomeAssistant extends Extension { for (const topic of lastDiscoveredTopics) { const isDeviceAutomation = topic.match(this.discoveryRegexWoTopic)?.[1] === "device_automation"; if (!newDiscoveredTopics.has(topic) && !isDeviceAutomation) { - await this.mqtt.publish(topic, "", {retain: true, qos: 1}, this.discoveryTopic, false, false); + await this.mqtt.publish(topic, "", {publishOptions: {retain: true, qos: 1}, baseTopic: this.discoveryTopic, skipReceive: false}); } } } @@ -1751,7 +1755,7 @@ export class HomeAssistant extends Extension { if (clear) { logger.debug(`Clearing outdated Home Assistant config '${data.topic}'`); - await this.mqtt.publish(topic, "", {retain: true, qos: 1}, this.discoveryTopic, false, false); + await this.mqtt.publish(topic, "", {publishOptions: {retain: true, qos: 1}, baseTopic: this.discoveryTopic, skipReceive: false}); } else if (entity) { this.getDiscovered(entity).messages[topic] = {payload: stringify(message), published: true}; } @@ -1784,7 +1788,7 @@ export class HomeAssistant extends Extension { for (const topic of Object.keys(discovered.messages)) { if (topic.startsWith("scene")) { - await this.mqtt.publish(topic, "", {retain: true, qos: 1}, this.discoveryTopic, false, false); + await this.mqtt.publish(topic, "", {publishOptions: {retain: true, qos: 1}, baseTopic: this.discoveryTopic, skipReceive: false}); delete discovered.messages[topic]; } } @@ -1917,7 +1921,11 @@ export class HomeAssistant extends Extension { origin: this.discoveryOrigin, }; - await this.mqtt.publish(topic, stringify(payload), {retain: true, qos: 1}, this.discoveryTopic, false, false); + await this.mqtt.publish(topic, stringify(payload), { + publishOptions: {retain: true, qos: 1}, + baseTopic: this.discoveryTopic, + skipReceive: false, + }); discovered.triggers.add(discoveredKey); } diff --git a/lib/mqtt.ts b/lib/mqtt.ts index d5262239..4c268441 100644 --- a/lib/mqtt.ts +++ b/lib/mqtt.ts @@ -12,6 +12,15 @@ import * as settings from "./util/settings"; import utils from "./util/utils"; const NS = "z2m:mqtt"; +const DEFAULT_CLIENT_PUBLISH_OPTIONS: IClientPublishOptions = {qos: 0 as const, retain: false}; + +export interface MqttPublishOptions { + publishOptions: IClientPublishOptions; + baseTopic: string; + skipLog: boolean; + skipReceive: boolean; + meta: {isEntityState?: boolean}; +} export default class Mqtt { private publishedTopics = new Set(); @@ -19,12 +28,18 @@ export default class Mqtt { private client!: MqttClient; private eventBus: EventBus; private republishRetainedTimer?: NodeJS.Timeout; - public retainedMessages: { - [s: string]: {payload: string; options: IClientPublishOptions; skipLog: boolean; skipReceive: boolean; topic: string; base: string}; - } = {}; + private defaultPublishOptions: MqttPublishOptions; + public retainedMessages: {[s: string]: {topic: string; payload: string; options: MqttPublishOptions}} = {}; constructor(eventBus: EventBus) { this.eventBus = eventBus; + this.defaultPublishOptions = { + publishOptions: {}, + baseTopic: settings.get().mqtt.base_topic, + skipLog: false, + skipReceive: true, + meta: {}, + }; } async connect(): Promise { @@ -109,7 +124,7 @@ export default class Mqtt { // Republish retained messages in case MQTT broker does not persist them. // https://github.com/Koenkk/zigbee2mqtt/issues/9629 for (const msg of Object.values(this.retainedMessages)) { - await this.publish(msg.topic, msg.payload, msg.options, msg.base, msg.skipLog, msg.skipReceive); + await this.publish(msg.topic, msg.payload, msg.options); } }, 2000); @@ -127,7 +142,7 @@ export default class Mqtt { const stateData: Zigbee2MQTTAPI["bridge/state"] = {state: "offline"}; - await this.publish("bridge/state", JSON.stringify(stateData), {retain: true, qos: 0}); + await this.publish("bridge/state", JSON.stringify(stateData), {publishOptions: {retain: true, qos: 0}}); this.eventBus.removeListeners(this); logger.info("Disconnecting from MQTT server"); await this.client?.endAsync(); @@ -146,7 +161,7 @@ export default class Mqtt { const stateData: Zigbee2MQTTAPI["bridge/state"] = {state: "online"}; - await this.publish("bridge/state", JSON.stringify(stateData), {retain: true, qos: 0}); + await this.publish("bridge/state", JSON.stringify(stateData), {publishOptions: {retain: true, qos: 0}}); await this.subscribe(`${settings.get().mqtt.base_topic}/#`); } @@ -168,39 +183,32 @@ export default class Mqtt { return this.client && !this.client.reconnecting && !this.client.disconnecting && !this.client.disconnected; } - async publish( - topic: string, - payload: string, - options: IClientPublishOptions = {}, - base = settings.get().mqtt.base_topic, - skipLog = false, - skipReceive = true, - ): Promise { + async publish(topic: string, payload: string, options: Partial = {}): Promise { if (topic.includes("+") || topic.includes("#")) { // https://github.com/Koenkk/zigbee2mqtt/issues/26939#issuecomment-2772309646 logger.error(`Topic '${topic}' includes wildcard characters, skipping publish.`); return; } - const defaultOptions = {qos: 0 as const, retain: false}; - topic = `${base}/${topic}`; + const finalOptions = {...this.defaultPublishOptions, ...options}; + topic = `${finalOptions.baseTopic}/${topic}`; - if (skipReceive) { + if (finalOptions.skipReceive) { this.publishedTopics.add(topic); } - if (options.retain) { + if (finalOptions.publishOptions.retain) { if (payload) { - this.retainedMessages[topic] = {payload, options, skipReceive, skipLog, topic: topic.substring(base.length + 1), base}; + this.retainedMessages[topic] = {payload, options: finalOptions, topic: topic.substring(finalOptions.baseTopic.length + 1)}; } else { delete this.retainedMessages[topic]; } } - this.eventBus.emitMQTTMessagePublished({topic, payload, options: {...defaultOptions, ...options}}); + this.eventBus.emitMQTTMessagePublished({topic, payload, options: finalOptions}); if (!this.isConnected()) { - if (!skipLog) { + if (!finalOptions.skipLog) { logger.error("Not connected to MQTT server!"); logger.error(`Cannot send message: topic: '${topic}', payload: '${payload}`); } @@ -208,20 +216,20 @@ export default class Mqtt { return; } - if (!skipLog) { + if (!finalOptions.skipLog) { logger.info(() => `MQTT publish: topic '${topic}', payload '${payload}'`, NS); } - const actualOptions: IClientPublishOptions = {...defaultOptions, ...options}; + const publishOptions: IClientPublishOptions = {...DEFAULT_CLIENT_PUBLISH_OPTIONS, ...finalOptions.publishOptions}; if (settings.get().mqtt.force_disable_retain) { - actualOptions.retain = false; + publishOptions.retain = false; } try { - await this.client.publishAsync(topic, payload, actualOptions); + await this.client.publishAsync(topic, payload, publishOptions); } catch (error) { - if (!skipLog) { + if (!finalOptions.skipLog) { logger.error(`MQTT server error: ${(error as Error).message}`); logger.error(`Could not send message: topic: '${topic}', payload: '${payload}`); } diff --git a/lib/types/types.d.ts b/lib/types/types.d.ts index 6025e0e2..7434b62c 100644 --- a/lib/types/types.d.ts +++ b/lib/types/types.d.ts @@ -6,6 +6,7 @@ import type TypeExtension from "../extension/extension"; import type TypeDevice from "../model/device"; import type TypeGroup from "../model/group"; import type TypeMqtt from "../mqtt"; +import type {MqttPublishOptions} from "../mqtt"; import type TypeState from "../state"; import type TypeZigbee from "../zigbee"; import type {Zigbee2MQTTDeviceOptions, Zigbee2MQTTGroupOptions, Zigbee2MQTTSettings} from "./api"; @@ -24,6 +25,7 @@ declare global { type StateChangeReason = "publishDebounce" | "groupOptimistic" | "lastSeenChanged" | "publishCached" | "publishThrottle"; type PublishEntityState = (entity: Device | Group, payload: KeyValue, stateChangeReason?: StateChangeReason) => Promise; type RecursivePartial = {[P in keyof T]?: RecursivePartial}; + type MakePartialExcept = Partial> & Pick; interface KeyValue { // biome-ignore lint/suspicious/noExplicitAny: API [s: string]: any; @@ -49,7 +51,7 @@ declare global { type EntityRenamed = {entity: Device | Group; homeAssisantRename: boolean; from: string; to: string}; type EntityRemoved = {id: string; name: string; type: "device"} | {id: number; name: string; type: "group"}; type MQTTMessage = {topic: string; message: string}; - type MQTTMessagePublished = {topic: string; payload: string; options: {retain: boolean; qos: number}}; + type MQTTMessagePublished = {topic: string; payload: string; options: MqttPublishOptions}; type StateChange = { entity: Device | Group; from: KeyValue; diff --git a/test/extensions/frontend.test.ts b/test/extensions/frontend.test.ts index aa3fb09b..6906b159 100644 --- a/test/extensions/frontend.test.ts +++ b/test/extensions/frontend.test.ts @@ -1,8 +1,8 @@ import * as data from "../mocks/data"; import {mockLogger} from "../mocks/logger"; -import {mockMQTTPublishAsync} from "../mocks/mqtt"; +import {events as mockMQTTEvents, mockMQTTPublishAsync} from "../mocks/mqtt"; import {type EventHandler, flushPromises} from "../mocks/utils"; -import {devices} from "../mocks/zigbeeHerdsman"; +import {devices, events as mockZHEvents} from "../mocks/zigbeeHerdsman"; import path from "node:path"; @@ -224,7 +224,7 @@ describe("Extension: Frontend", () => { await controller.stop(); }); - it("Websocket interaction", async () => { + it("onlythis Websocket interaction", async () => { controller = new Controller(vi.fn(), vi.fn()); await controller.start(); mockWSClient.readyState = "open"; @@ -268,13 +268,27 @@ describe("Extension: Frontend", () => { expect(mockWSClient.send).toHaveBeenCalledWith( stringify({ topic: "bulb_color", - payload: { - state: "ON", - effect: null, - power_on_behavior: null, - linkquality: null, - update: {state: null, installed_version: -1, latest_version: -1}, - }, + payload: {state: "ON"}, + }), + ); + + // Should publish bridge messages + await mockZHEvents.deviceJoined({device: devices.bulb}); + await flushPromises(); + expect(mockWSClient.send).toHaveBeenCalledWith( + stringify({payload: {data: {friendly_name: "bulb", ieee_address: "0x000b57fffec6a5b2"}, type: "device_joined"}, topic: "bridge/event"}), + ); + + // Should send JSON state event when `output: attribute` + mockWSClient.send.mockClear(); + settings.set(["advanced", "output"], "attribute"); + await mockMQTTEvents.message("zigbee2mqtt/bulb_color/set", stringify({brightness: 90})); + await flushPromises(); + expect(mockWSClient.send).toHaveBeenCalledTimes(1); + expect(mockWSClient.send).toHaveBeenCalledWith( + stringify({ + topic: "bulb_color", + payload: {state: "ON", brightness: 90}, }), );