Clarify MqttClient structure, logging, and start using DeviceRegistry

This commit is contained in:
Victor Hagelbäck 2021-02-01 21:19:22 +01:00
parent 4f51063c41
commit 75b9a1a8d7

View file

@ -1,5 +1,7 @@
const EventEmitter = require('events'); const EventEmitter = require('events');
const mqtt = require('mqtt'); const mqtt = require('mqtt');
const Configuration = require('./Configuration');
const Logger = require('./Logger'); const Logger = require('./Logger');
const startTopics = ['hass/status', 'homeassistant/status']; const startTopics = ['hass/status', 'homeassistant/status'];
@ -19,6 +21,18 @@ const getAvailabilityTopic = (plug) => `${getPath(plug)}/availability`;
const getCommandTopic = (plug) => `${getPath(plug)}/set`; const getCommandTopic = (plug) => `${getPath(plug)}/set`;
const getSceneEventTopic = () => 'plejd/event/scene'; const getSceneEventTopic = () => 'plejd/event/scene';
const decodeTopicRegexp = new RegExp(
/(?<prefix>[^[]+)\/(?<type>.+)\/plejd\/(?<id>.+)\/(?<command>config|state|availability|set|scene)/,
);
const decodeTopic = (topic) => {
const matches = decodeTopicRegexp.exec(topic);
if (!matches) {
return null;
}
return matches.groups;
};
const getDiscoveryPayload = (device) => ({ const getDiscoveryPayload = (device) => ({
schema: 'json', schema: 'json',
name: device.name, name: device.name,
@ -54,23 +68,21 @@ const getSwitchPayload = (device) => ({
// #endregion // #endregion
class MqttClient extends EventEmitter { class MqttClient extends EventEmitter {
constructor(mqttBroker, username, password) { deviceRegistry;
constructor(deviceRegistry) {
super(); super();
this.mqttBroker = mqttBroker; this.config = Configuration.getOptions();
this.username = username; this.deviceRegistry = deviceRegistry;
this.password = password;
this.deviceMap = {};
this.devices = [];
} }
init() { init() {
logger.info('Initializing MQTT connection for Plejd addon'); logger.info('Initializing MQTT connection for Plejd addon');
const self = this;
this.client = mqtt.connect(this.mqttBroker, { this.client = mqtt.connect(this.config.mqttBroker, {
username: this.username, username: this.config.mqttUsername,
password: this.password, password: this.config.mqttPassword,
}); });
this.client.on('connect', () => { this.client.on('connect', () => {
@ -81,7 +93,7 @@ class MqttClient extends EventEmitter {
logger.error('Unable to subscribe to status topics'); logger.error('Unable to subscribe to status topics');
} }
self.emit('connected'); this.emit('connected');
}); });
this.client.subscribe(getSubscribePath(), (err) => { this.client.subscribe(getSubscribePath(), (err) => {
@ -93,32 +105,52 @@ class MqttClient extends EventEmitter {
this.client.on('close', () => { this.client.on('close', () => {
logger.verbose('Warning: mqtt channel closed event, reconnecting...'); logger.verbose('Warning: mqtt channel closed event, reconnecting...');
self.reconnect(); this.reconnect();
}); });
this.client.on('message', (topic, message) => { this.client.on('message', (topic, message) => {
// const command = message.toString(); // const command = message.toString();
const command = message.toString().substring(0, 1) === '{' const command =
? JSON.parse(message.toString()) message.toString().substring(0, 1) === '{'
: message.toString(); ? JSON.parse(message.toString())
: message.toString();
if (startTopics.includes(topic)) { if (startTopics.includes(topic)) {
logger.info('Home Assistant has started. lets do discovery.'); logger.info('Home Assistant has started. lets do discovery.');
self.emit('connected'); this.emit('connected');
} else if (topic.includes('set')) {
logger.verbose(`Got mqtt command on ${topic} - ${message}`);
const device = self.devices.find((x) => getCommandTopic(x) === topic);
if (device) {
self.emit('stateChanged', device, command);
} else {
logger.warn(
`Device for topic ${topic} not found! Can happen if HA calls previously existing devices.`,
);
}
} else if (topic.includes('state')) {
logger.verbose(`State update sent over mqtt to HA ${topic} - ${message}`);
} else { } else {
logger.verbose(`Warning: Got unrecognized mqtt command on ${topic} - ${message}`); const decodedTopic = decodeTopic(topic);
if (decodedTopic) {
const device = this.deviceRegistry.getDevice(decodedTopic.id);
const deviceName = device ? device.name : '';
switch (decodedTopic.command) {
case 'set':
logger.verbose(
`Got mqtt SET command for ${decodedTopic.type}, ${deviceName} (${decodedTopic.id}): ${message}`,
);
if (device) {
this.emit('stateChanged', device, command);
} else {
logger.warn(
`Device for topic ${topic} not found! Can happen if HA calls previously existing devices.`,
);
}
break;
case 'state':
case 'config':
case 'availability':
logger.verbose(
`Sent mqtt ${decodedTopic.command} command for ${decodedTopic.type}, ${deviceName} (${decodedTopic.id}). ${decodedTopic.command === 'availability' ? message : ''}`,
);
break;
default:
logger.verbose(`Warning: Unknown command ${decodedTopic.command} in decoded topic`);
}
} else {
logger.verbose(`Warning: Got unrecognized mqtt command on '${topic}': ${message}`);
}
} }
}); });
} }
@ -128,37 +160,33 @@ class MqttClient extends EventEmitter {
} }
disconnect(callback) { disconnect(callback) {
this.devices.forEach((device) => { this.deviceRegistry.allDevices.forEach((device) => {
this.client.publish(getAvailabilityTopic(device), 'offline'); this.client.publish(getAvailabilityTopic(device), 'offline');
}); });
this.client.end(callback); this.client.end(callback);
} }
discover(devices) { sendDiscoveryToHomeAssistant() {
this.devices = devices; logger.debug(`Sending discovery of ${this.deviceRegistry.allDevices.length} device(s).`);
const self = this; this.deviceRegistry.allDevices.forEach((device) => {
logger.debug(`Sending discovery of ${devices.length} device(s).`);
devices.forEach((device) => {
logger.debug(`Sending discovery for ${device.name}`); logger.debug(`Sending discovery for ${device.name}`);
const payload = device.type === 'switch' ? getSwitchPayload(device) : getDiscoveryPayload(device); const payload =
device.type === 'switch' ? getSwitchPayload(device) : getDiscoveryPayload(device);
logger.info( logger.info(
`Discovered ${device.type} (${device.typeName}) named ${device.name} with PID ${device.id}.`, `Discovered ${device.type} (${device.typeName}) named ${device.name} with PID ${device.id}.`,
); );
self.deviceMap[device.id] = payload.unique_id; this.client.publish(getConfigPath(device), JSON.stringify(payload));
self.client.publish(getConfigPath(device), JSON.stringify(payload));
setTimeout(() => { setTimeout(() => {
self.client.publish(getAvailabilityTopic(device), 'online'); this.client.publish(getAvailabilityTopic(device), 'online');
}, 2000); }, 2000);
}); });
} }
updateState(deviceId, data) { updateState(deviceId, data) {
const device = this.devices.find((x) => x.id === deviceId); const device = this.deviceRegistry.getDevice(deviceId);
if (!device) { if (!device) {
logger.warn(`Unknown device id ${deviceId} - not handled by us.`); logger.warn(`Unknown device id ${deviceId} - not handled by us.`);
@ -193,9 +221,9 @@ class MqttClient extends EventEmitter {
this.client.publish(getAvailabilityTopic(device), 'online'); this.client.publish(getAvailabilityTopic(device), 'online');
} }
sceneTriggered(scene) { sceneTriggered(sceneId) {
logger.verbose(`Scene triggered: ${scene}`); logger.verbose(`Scene triggered: ${sceneId}`);
this.client.publish(getSceneEventTopic(), JSON.stringify({ scene })); this.client.publish(getSceneEventTopic(), JSON.stringify({ scene: sceneId }));
} }
} }