From ddb83cc6e449d2cbebce24e9ced1d3ae4ff7dc0d Mon Sep 17 00:00:00 2001 From: Koenkk Date: Tue, 23 Oct 2018 20:39:48 +0200 Subject: [PATCH] Handle zigbee commands sequentially (and refactor it). #529 --- lib/controller.js | 129 +++------------------------ lib/extension/deviceCommand.js | 157 +++++++++++++++++++++++++++++++++ lib/extension/networkMap.js | 2 + lib/util/settings.js | 1 + lib/zigbee.js | 4 +- npm-shrinkwrap.json | 10 ++- package.json | 1 + 7 files changed, 184 insertions(+), 120 deletions(-) create mode 100644 lib/extension/deviceCommand.js diff --git a/lib/controller.js b/lib/controller.js index 9e523fd3..23b28fd1 100644 --- a/lib/controller.js +++ b/lib/controller.js @@ -6,14 +6,12 @@ const settings = require('./util/settings'); const ExtensionNetworkMap = require('./extension/networkMap'); const ExtensionSoftReset = require('./extension/softReset'); const ExtensionRouterPollXiaomi = require('./extension/routerPollXiaomi'); +const ExtensionDeviceCommand = require('./extension/deviceCommand'); const zigbeeShepherdConverters = require('zigbee-shepherd-converters'); const homeassistant = require('./homeassistant'); const objectAssignDeep = require('object-assign-deep'); const mqttConfigRegex = new RegExp(`${settings.get().mqtt.base_topic}/bridge/config/\\w+`, 'g'); -const mqttDeviceRegex = new RegExp(`${settings.get().mqtt.base_topic}/[\\w\\s\\d.-]+/set`, 'g'); -const mqttDevicePrefixRegex = new RegExp(`${settings.get().mqtt.base_topic}/[\\w\\s\\d.-]+/[\\w\\s\\d.-]+/set`, 'g'); - const allowedLogLevels = ['error', 'warn', 'info', 'debug']; @@ -67,8 +65,6 @@ class Controller { // Connect to MQTT broker const subscriptions = [ - `${settings.get().mqtt.base_topic}/+/set`, - `${settings.get().mqtt.base_topic}/+/+/set`, `${settings.get().mqtt.base_topic}/bridge/config/+`, ]; @@ -96,9 +92,10 @@ class Controller { // Initialize extensions. this.extensions = [ - new ExtensionNetworkMap(this.zigbee, this.mqtt, this.state), - new ExtensionSoftReset(this.zigbee, this.mqtt, this.state), - new ExtensionRouterPollXiaomi(this.zigbee, this.mqtt, this.state), + new ExtensionDeviceCommand(this.zigbee, this.mqtt, this.state, this.mqttPublishDeviceState), + new ExtensionNetworkMap(this.zigbee, this.mqtt, this.state, this.mqttPublishDeviceState), + new ExtensionSoftReset(this.zigbee, this.mqtt, this.state, this.mqttPublishDeviceState), + new ExtensionRouterPollXiaomi(this.zigbee, this.mqtt, this.state, this.mqttPublishDeviceState), ]; // Resend all cached states. @@ -320,18 +317,15 @@ class Controller { } handleMQTTMessage(topic, message) { - logger.debug(`Received mqtt message on topic '${topic}' with data '${message}'`); + logger.debug(`Received MQTT message on '${topic}' with data '${message}'`); - // Find extensions that could handle this. - const extensions = this.extensions.filter((e) => e.handleMQTTMessage); - - // Call extensions. - const extensionResults = extensions.map((e) => e.handleMQTTMessage(topic, message)); + // Find extensions that can handle MQTT messages and get results + const results = this.extensions + .filter((e) => e.handleMQTTMessage) + .map((e) => e.handleMQTTMessage(topic, message)); if (topic.match(mqttConfigRegex)) { this.handleMQTTMessageConfig(topic, message); - } else if (topic.match(mqttDeviceRegex) || topic.match(mqttDevicePrefixRegex)) { - this.handleMQTTMessageDevice(topic, message, topic.match(mqttDevicePrefixRegex)); } else if (topic === 'hass/status') { if (message.toString().toLowerCase() === 'online') { const timer = setTimeout(() => { @@ -339,8 +333,8 @@ class Controller { clearTimeout(timer); }, 20000); } - } else if (!extensionResults.includes(true)) { - logger.warn(`Cannot handle MQTT message with topic '${topic}' and message '${message}'`); + } else if (!results.includes(true)) { + logger.warn(`Cannot handle MQTT message on '${topic}' with data '${message}'`); } } @@ -448,105 +442,6 @@ class Controller { } } - handleMQTTMessageDevice(topic, message, withPrefix) { - const friendlyName = topic.split('/').slice(withPrefix ? -3 : -2)[0]; - const topicPrefix = withPrefix ? topic.split('/').slice(-2)[0] : ''; - - // Map friendlyName to deviceID. - const deviceID = settings.getIDByFriendlyName(friendlyName); - - if (!deviceID) { - logger.error(`Cannot handle '${topic}' because deviceID of '${friendlyName}' cannot be found`); - return; - } - - // Convert the MQTT message to a Zigbee message. - let json = null; - try { - json = JSON.parse(message); - } catch (e) { - // Cannot be parsed to JSON, assume state message. - json = {state: message.toString()}; - } - - // Find ep for this device - const device = this.zigbee.getDevice(deviceID); - if (!device) { - logger.error(`Failed to find device with deviceID ${deviceID}`); - return; - } - - const mappedModel = zigbeeShepherdConverters.findByZigbeeModel(device.modelId); - if (!mappedModel) { - logger.warn(`Device with modelID '${device.modelId}' is not supported.`); - logger.warn(`Please see: https://github.com/Koenkk/zigbee2mqtt/wiki/How-to-support-new-devices`); - return; - } - - const ep = mappedModel.ep && mappedModel.ep[topicPrefix] ? mappedModel.ep[topicPrefix] : null; - const published = []; - - Object.keys(json).forEach((key) => { - // Find converter for this key. - const converter = mappedModel.toZigbee.find((c) => c.key === key); - - if (!converter) { - logger.error(`No converter available for '${key}' (${json[key]})`); - return; - } - - const message = converter.convert(json[key], json); - - if (!message) { - return; - } - - const callback = (error) => { - // Devices do not report when they go off, this ensures state (on/off) is always in sync. - if (!error && (key.startsWith('state') || key === 'brightness')) { - const msg = {}; - const _key = topicPrefix ? `state_${topicPrefix}` : 'state'; - msg[_key] = key === 'brightness' ? 'ON' : json['state']; - this.mqttPublishDeviceState(device, msg, true); - } - }; - - this.zigbee.publish(deviceID, message.cid, message.cmd, message.zclData, - message.cfg, ep, message.type, callback); - - published.push({message: message, converter: converter}); - }); - - /** - * After publishing a command to a zigbee device we want to monitor the changed attribute(s) so that - * everything stays in sync. - */ - published.forEach((p) => { - let counter = 0; - let secondsToMonitor = 1; - - // In case of a transition we need to monitor for the whole transition time. - if (p.message.zclData.hasOwnProperty('transtime')) { - // Note that: transtime 10 = 0.1 seconds, 100 = 1 seconds, etc. - secondsToMonitor = (p.message.zclData.transtime / 10) + 1; - } - - const timer = setInterval(() => { - counter++; - - // Doing a 'read' will result in the device sending a zigbee message with the current attribute value. - // which will be handled by this.handleZigbeeMessage. - p.converter.attr.forEach((attribute) => { - this.zigbee.read(deviceID, p.message.cid, attribute, ep, () => null); - }); - - if (counter >= secondsToMonitor) { - clearTimeout(timer); - } - }, 1000); - }); - } - mqttPublishDeviceState(device, payload, cache) { const deviceID = device.ieeeAddr; const appSettings = settings.get(); diff --git a/lib/extension/deviceCommand.js b/lib/extension/deviceCommand.js new file mode 100644 index 00000000..7ab36eaa --- /dev/null +++ b/lib/extension/deviceCommand.js @@ -0,0 +1,157 @@ + +const settings = require('../util/settings'); +const zigbeeShepherdConverters = require('zigbee-shepherd-converters'); +const Queue = require('queue'); +const logger = require('../util/logger'); + +const setTopic = new RegExp(`${settings.get().mqtt.base_topic}/[\\w\\s\\d.-]+/set`, 'g'); +const setWithPrefixTopic = new RegExp(`${settings.get().mqtt.base_topic}/[\\w\\s\\d.-]+/[\\w\\s\\d.-]+/set`, 'g'); + +class DeviceCommand { + constructor(zigbee, mqtt, state, mqttPublishDeviceState) { + this.zigbee = zigbee; + this.mqtt = mqtt; + this.state = state; + + // TODO -> remove this; move to publish device state method to mqtt.js + this.mqttPublishDeviceState = mqttPublishDeviceState; + + /** + * Setup command queue. + * The command queue ensures that only 1 command is executed at a time. + * When executing multiple commands at the same time, some commands may fail. + */ + this.queue = new Queue(); + this.queue.concurrency = 1; + this.queue.autostart = true; + + // Subscribe to topics. + this.mqtt.subscribe(`${settings.get().mqtt.base_topic}/+/set`); + this.mqtt.subscribe(`${settings.get().mqtt.base_topic}/+/+/set`); + } + + stop() { + this.queue.stop(); + } + + handleMQTTMessage(topic, message) { + if (!topic.match(setTopic) && !topic.match(setWithPrefixTopic)) { + // Can't handle this message + return false; + } + + // Parse topic + const hasPrefix = topic.match(setWithPrefixTopic); + const friendlyName = topic.split('/').slice(hasPrefix ? -3 : -2)[0]; + const prefix = hasPrefix ? topic.split('/').slice(-2)[0] : ''; + + // Map friendlyName to ieeeAddr. + const ieeeAddr = settings.getIeeAddrByFriendlyName(friendlyName); + if (!ieeeAddr) { + logger.error(`Cannot handle '${topic}' because ieeAddr of '${friendlyName}' cannot be found`); + return; + } + + // Get device + const device = this.zigbee.getDevice(ieeeAddr); + if (!device) { + logger.error(`Failed to find device with ieeAddr: '${ieeeAddr}'`); + return; + } + + // Map device to a model + const model = zigbeeShepherdConverters.findByZigbeeModel(device.modelId); + if (!model) { + logger.warn(`Device with modelID '${device.modelId}' is not supported.`); + logger.warn(`Please see: https://github.com/Koenkk/zigbee2mqtt/wiki/How-to-support-new-devices`); + return; + } + + // Convert the MQTT message to a Zigbee message. + let json = null; + try { + json = JSON.parse(message); + } catch (e) { + // Cannot be parsed to JSON, assume state message. + json = {state: message.toString()}; + } + + // Determine endpoint to publish to. + const endpoint = model.hasOwnProperty('ep') && model.ep.hasOwnProperty(prefix) ? model.ep[prefix] : null; + + // For each key in the JSON message find the matching converter. + Object.keys(json).forEach((key) => { + const converter = model.toZigbee.find((c) => c.key === key); + if (!converter) { + logger.error(`No converter available for '${key}' (${json[key]})`); + return; + } + + // Converter didn't return a result, skip + const converted = converter.convert(json[key], json); + if (!converted) { + return; + } + + // Add job to queue + this.queue.push((queueCallback) => { + this.zigbee.publish( + ieeeAddr, + converted.cid, + converted.cmd, + converted.zclData, + converted.cfg, + endpoint, + converted.type, + (error) => { + // Devices do not report when they go off, this ensures state (on/off) is always in sync. + if (!error && (key.startsWith('state') || key === 'brightness')) { + const msg = {}; + const _key = prefix ? `state_${prefix}` : 'state'; + msg[_key] = key === 'brightness' ? 'ON' : json['state']; + this.mqttPublishDeviceState(device, msg, true); + } + + queueCallback(); + } + ); + }); + }); + + return true; + + // TODO + // Is this still needed?????? + /** + * After publishing a command to a zigbee device we want to monitor the changed attribute(s) so that + * everything stays in sync. + */ + // published.forEach((p) => { + // let counter = 0; + // let secondsToMonitor = 1; + + // // In case of a transition we need to monitor for the whole transition time. + // if (p.message.zclData.hasOwnProperty('transtime')) { + // // Note that: transtime 10 = 0.1 seconds, 100 = 1 seconds, etc. + // secondsToMonitor = (p.message.zclData.transtime / 10) + 1; + // } + + // const timer = setInterval(() => { + // counter++; + + // // Doing a 'read' will result in the device sending a zigbee message with the + // //current attribute value. + // // which will be handled by this.handleZigbeeMessage. + // p.converter.attr.forEach((attribute) => { + // this.zigbee.read(deviceID, p.message.cid, attribute, ep, () => null); + // }); + + // if (counter >= secondsToMonitor) { + // clearTimeout(timer); + // } + // }, 1000); + // }); + } +} + +module.exports = DeviceCommand; diff --git a/lib/extension/networkMap.js b/lib/extension/networkMap.js index 5dc6fc0a..76334899 100644 --- a/lib/extension/networkMap.js +++ b/lib/extension/networkMap.js @@ -30,6 +30,8 @@ class NetworkMap { return true; } + + return false; } raw(zigbee, topology) { diff --git a/lib/util/settings.js b/lib/util/settings.js index 42cf48e6..b64fe398 100644 --- a/lib/util/settings.js +++ b/lib/util/settings.js @@ -75,4 +75,5 @@ module.exports = { removeDevice: (id) => removeDevice(id), getIDByFriendlyName: (friendlyName) => getIDByFriendlyName(friendlyName), changeFriendlyName: (old, new_) => changeFriendlyName(old, new_), + getIeeAddrByFriendlyName: (friendlyName) => getIDByFriendlyName(friendlyName), }; diff --git a/lib/zigbee.js b/lib/zigbee.js index a605c5ef..7d433098 100644 --- a/lib/zigbee.js +++ b/lib/zigbee.js @@ -177,8 +177,8 @@ class Zigbee { return this.shepherd.list(); } - getDevice(deviceID) { - return this.getDevices().find((d) => d.ieeeAddr === deviceID); + getDevice(ieeeAddr) { + return this.getDevices().find((d) => d.ieeeAddr === ieeeAddr); } getCoordinator() { diff --git a/npm-shrinkwrap.json b/npm-shrinkwrap.json index e082c726..5b79df31 100644 --- a/npm-shrinkwrap.json +++ b/npm-shrinkwrap.json @@ -3544,7 +3544,7 @@ }, "pify": { "version": "2.3.0", - "resolved": "https://registry.npmjs.org/pify/-/pify-2.3.0.tgz", + "resolved": "http://registry.npmjs.org/pify/-/pify-2.3.0.tgz", "integrity": "sha1-7RQaasBDqEnqWISY59yosVMw6Qw=", "dev": true }, @@ -3844,6 +3844,14 @@ "resolved": "https://registry.npmjs.org/q/-/q-1.5.1.tgz", "integrity": "sha1-fjL3W0E4EpHQRhHxvxQQmsAGUdc=" }, + "queue": { + "version": "4.5.0", + "resolved": "https://registry.npmjs.org/queue/-/queue-4.5.0.tgz", + "integrity": "sha512-DwxpAnqJuoQa+wyDgQuwkSshkhlqIlWEvwvdAY27fDPunZ2cVJzXU4JyjY+5l7zs7oGLaYAQm4MbLOVFAHFBzA==", + "requires": { + "inherits": "~2.0.0" + } + }, "radio-symbol": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/radio-symbol/-/radio-symbol-2.0.0.tgz", diff --git a/package.json b/package.json index da05bf60..8cc8d151 100644 --- a/package.json +++ b/package.json @@ -38,6 +38,7 @@ "moment": "*", "mqtt": "*", "object-assign-deep": "*", + "queue": "*", "rimraf": "*", "winston": "2.4.2", "zcl-packet": "git+https://github.com/Koenkk/zcl-packet.git#fbd8c936bbd4be0597ad3e934be0ca722b0128a6",