From 75b9a1a8d72d3778d6fa026e9be003e0b786bead Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20Hagelb=C3=A4ck?= Date: Mon, 1 Feb 2021 21:19:22 +0100 Subject: [PATCH] Clarify MqttClient structure, logging, and start using DeviceRegistry --- plejd/MqttClient.js | 118 +++++++++++++++++++++++++++----------------- 1 file changed, 73 insertions(+), 45 deletions(-) diff --git a/plejd/MqttClient.js b/plejd/MqttClient.js index f0ceca5..88b877d 100644 --- a/plejd/MqttClient.js +++ b/plejd/MqttClient.js @@ -1,5 +1,7 @@ const EventEmitter = require('events'); const mqtt = require('mqtt'); + +const Configuration = require('./Configuration'); const Logger = require('./Logger'); const startTopics = ['hass/status', 'homeassistant/status']; @@ -19,6 +21,18 @@ const getAvailabilityTopic = (plug) => `${getPath(plug)}/availability`; const getCommandTopic = (plug) => `${getPath(plug)}/set`; const getSceneEventTopic = () => 'plejd/event/scene'; +const decodeTopicRegexp = new RegExp( + /(?[^[]+)\/(?.+)\/plejd\/(?.+)\/(?config|state|availability|set|scene)/, +); + +const decodeTopic = (topic) => { + const matches = decodeTopicRegexp.exec(topic); + if (!matches) { + return null; + } + return matches.groups; +}; + const getDiscoveryPayload = (device) => ({ schema: 'json', name: device.name, @@ -54,23 +68,21 @@ const getSwitchPayload = (device) => ({ // #endregion class MqttClient extends EventEmitter { - constructor(mqttBroker, username, password) { + deviceRegistry; + + constructor(deviceRegistry) { super(); - this.mqttBroker = mqttBroker; - this.username = username; - this.password = password; - this.deviceMap = {}; - this.devices = []; + this.config = Configuration.getOptions(); + this.deviceRegistry = deviceRegistry; } init() { logger.info('Initializing MQTT connection for Plejd addon'); - const self = this; - this.client = mqtt.connect(this.mqttBroker, { - username: this.username, - password: this.password, + this.client = mqtt.connect(this.config.mqttBroker, { + username: this.config.mqttUsername, + password: this.config.mqttPassword, }); this.client.on('connect', () => { @@ -81,7 +93,7 @@ class MqttClient extends EventEmitter { logger.error('Unable to subscribe to status topics'); } - self.emit('connected'); + this.emit('connected'); }); this.client.subscribe(getSubscribePath(), (err) => { @@ -93,32 +105,52 @@ class MqttClient extends EventEmitter { this.client.on('close', () => { logger.verbose('Warning: mqtt channel closed event, reconnecting...'); - self.reconnect(); + this.reconnect(); }); this.client.on('message', (topic, message) => { // const command = message.toString(); - const command = message.toString().substring(0, 1) === '{' - ? JSON.parse(message.toString()) - : message.toString(); + const command = + message.toString().substring(0, 1) === '{' + ? JSON.parse(message.toString()) + : message.toString(); if (startTopics.includes(topic)) { logger.info('Home Assistant has started. lets do discovery.'); - self.emit('connected'); - } else if (topic.includes('set')) { - logger.verbose(`Got mqtt command on ${topic} - ${message}`); - const device = self.devices.find((x) => getCommandTopic(x) === topic); - if (device) { - self.emit('stateChanged', device, command); - } else { - logger.warn( - `Device for topic ${topic} not found! Can happen if HA calls previously existing devices.`, - ); - } - } else if (topic.includes('state')) { - logger.verbose(`State update sent over mqtt to HA ${topic} - ${message}`); + this.emit('connected'); } else { - logger.verbose(`Warning: Got unrecognized mqtt command on ${topic} - ${message}`); + const decodedTopic = decodeTopic(topic); + if (decodedTopic) { + const device = this.deviceRegistry.getDevice(decodedTopic.id); + const deviceName = device ? device.name : ''; + + switch (decodedTopic.command) { + case 'set': + logger.verbose( + `Got mqtt SET command for ${decodedTopic.type}, ${deviceName} (${decodedTopic.id}): ${message}`, + ); + + if (device) { + this.emit('stateChanged', device, command); + } else { + logger.warn( + `Device for topic ${topic} not found! Can happen if HA calls previously existing devices.`, + ); + } + break; + case 'state': + case 'config': + case 'availability': + logger.verbose( + `Sent mqtt ${decodedTopic.command} command for ${decodedTopic.type}, ${deviceName} (${decodedTopic.id}). ${decodedTopic.command === 'availability' ? message : ''}`, + ); + break; + default: + logger.verbose(`Warning: Unknown command ${decodedTopic.command} in decoded topic`); + } + } else { + logger.verbose(`Warning: Got unrecognized mqtt command on '${topic}': ${message}`); + } } }); } @@ -128,37 +160,33 @@ class MqttClient extends EventEmitter { } disconnect(callback) { - this.devices.forEach((device) => { + this.deviceRegistry.allDevices.forEach((device) => { this.client.publish(getAvailabilityTopic(device), 'offline'); }); this.client.end(callback); } - discover(devices) { - this.devices = devices; + sendDiscoveryToHomeAssistant() { + logger.debug(`Sending discovery of ${this.deviceRegistry.allDevices.length} device(s).`); - const self = this; - logger.debug(`Sending discovery of ${devices.length} device(s).`); - - devices.forEach((device) => { + this.deviceRegistry.allDevices.forEach((device) => { logger.debug(`Sending discovery for ${device.name}`); - const payload = device.type === 'switch' ? getSwitchPayload(device) : getDiscoveryPayload(device); + const payload = + device.type === 'switch' ? getSwitchPayload(device) : getDiscoveryPayload(device); logger.info( `Discovered ${device.type} (${device.typeName}) named ${device.name} with PID ${device.id}.`, ); - self.deviceMap[device.id] = payload.unique_id; - - self.client.publish(getConfigPath(device), JSON.stringify(payload)); + this.client.publish(getConfigPath(device), JSON.stringify(payload)); setTimeout(() => { - self.client.publish(getAvailabilityTopic(device), 'online'); + this.client.publish(getAvailabilityTopic(device), 'online'); }, 2000); }); } updateState(deviceId, data) { - const device = this.devices.find((x) => x.id === deviceId); + const device = this.deviceRegistry.getDevice(deviceId); if (!device) { logger.warn(`Unknown device id ${deviceId} - not handled by us.`); @@ -193,9 +221,9 @@ class MqttClient extends EventEmitter { this.client.publish(getAvailabilityTopic(device), 'online'); } - sceneTriggered(scene) { - logger.verbose(`Scene triggered: ${scene}`); - this.client.publish(getSceneEventTopic(), JSON.stringify({ scene })); + sceneTriggered(sceneId) { + logger.verbose(`Scene triggered: ${sceneId}`); + this.client.publish(getSceneEventTopic(), JSON.stringify({ scene: sceneId })); } }