Clean up events and subscriptions

This commit is contained in:
Victor Hagelbäck 2021-02-20 15:33:06 +01:00
parent 4d7de61e42
commit ca7a5cdd57
6 changed files with 117 additions and 51 deletions

View file

@ -34,14 +34,20 @@ class DeviceRegistry {
this.deviceIdsBySerial[added.serialNumber] = added.id; this.deviceIdsBySerial[added.serialNumber] = added.id;
logger.verbose(`Added/updated device: ${JSON.stringify(added)}`); logger.verbose(
`Added/updated device: ${JSON.stringify(added)}. ${
Object.keys(this.plejdDevices).length
} plejd devices in total.`,
);
if (added.roomId) { if (added.roomId) {
this.deviceIdsByRoom[added.roomId] = [ this.deviceIdsByRoom[added.roomId] = [
...(this.deviceIdsByRoom[added.roomId] || []), ...(this.deviceIdsByRoom[added.roomId] || []),
added.id, added.id,
]; ];
logger.verbose(`Added to room: ${JSON.stringify(this.deviceIdsByRoom[added.roomId])}`); logger.verbose(
`Added to room #${added.roomId}: ${JSON.stringify(this.deviceIdsByRoom[added.roomId])}`,
);
} }
return added; return added;
@ -54,10 +60,14 @@ class DeviceRegistry {
}; };
this.roomDevices = { this.roomDevices = {
...this.roomDevices, ...this.roomDevices,
[device.id]: added, [added.id]: added,
}; };
logger.verbose(`Added/updated room device: ${JSON.stringify(added)}`); logger.verbose(
`Added/updated room device: ${JSON.stringify(added)}. ${
Object.keys(this.roomDevices).length
} room devices total.`,
);
return added; return added;
} }
@ -68,9 +78,13 @@ class DeviceRegistry {
}; };
this.sceneDevices = { this.sceneDevices = {
...this.sceneDevices, ...this.sceneDevices,
added, [added.id]: added,
}; };
logger.verbose(`Added/updated scene: ${JSON.stringify(added)}`); logger.verbose(
`Added/updated scene: ${JSON.stringify(added)}. ${
Object.keys(this.sceneDevices).length
} scenes in total.`,
);
return added; return added;
} }
@ -130,7 +144,8 @@ class DeviceRegistry {
} }
setState(deviceId, state, dim) { setState(deviceId, state, dim) {
const device = this.addPlejdDevice({ id: deviceId, state }); const device = this.getDevice(deviceId) || this.addPlejdDevice({ id: deviceId });
device.state = state;
if (dim && device.dimmable) { if (dim && device.dimmable) {
device.dim = dim; device.dim = dim;
} }

View file

@ -70,6 +70,11 @@ const getSwitchPayload = (device) => ({
class MqttClient extends EventEmitter { class MqttClient extends EventEmitter {
deviceRegistry; deviceRegistry;
static EVENTS = {
connected: 'connected',
stateChanged: 'stateChanged',
};
constructor(deviceRegistry) { constructor(deviceRegistry) {
super(); super();
@ -97,7 +102,7 @@ class MqttClient extends EventEmitter {
logger.error('Unable to subscribe to status topics'); logger.error('Unable to subscribe to status topics');
} }
this.emit('connected'); this.emit(MqttClient.EVENTS.connected);
}); });
this.client.subscribe(getSubscribePath(), (err) => { this.client.subscribe(getSubscribePath(), (err) => {
@ -115,7 +120,7 @@ class MqttClient extends EventEmitter {
this.client.on('message', (topic, message) => { this.client.on('message', (topic, message) => {
if (startTopics.includes(topic)) { if (startTopics.includes(topic)) {
logger.info('Home Assistant has started. lets do discovery.'); logger.info('Home Assistant has started. lets do discovery.');
this.emit('connected'); this.emit(MqttClient.EVENTS.connected);
} else { } else {
const decodedTopic = decodeTopic(topic); const decodedTopic = decodeTopic(topic);
if (decodedTopic) { if (decodedTopic) {
@ -148,7 +153,7 @@ class MqttClient extends EventEmitter {
); );
if (device) { if (device) {
this.emit('stateChanged', device, command); this.emit(MqttClient.EVENTS.stateChanged, device, command);
} else { } else {
logger.warn( logger.warn(
`Device for topic ${topic} not found! Can happen if HA calls previously existing devices.`, `Device for topic ${topic} not found! Can happen if HA calls previously existing devices.`,
@ -182,6 +187,10 @@ class MqttClient extends EventEmitter {
this.client.reconnect(); this.client.reconnect();
} }
cleanup() {
this.client.removeAllListeners();
}
disconnect(callback) { disconnect(callback) {
this.deviceRegistry.allDevices.forEach((device) => { this.deviceRegistry.allDevices.forEach((device) => {
this.client.publish(getAvailabilityTopic(device), 'offline'); this.client.publish(getAvailabilityTopic(device), 'offline');

View file

@ -17,6 +17,7 @@ class PlejdAddon extends EventEmitter {
plejdApi; plejdApi;
plejdDeviceCommunication; plejdDeviceCommunication;
mqttClient; mqttClient;
processCleanupFunc;
sceneManager; sceneManager;
constructor() { constructor() {
@ -31,19 +32,30 @@ class PlejdAddon extends EventEmitter {
this.mqttClient = new MqttClient(this.deviceRegistry); this.mqttClient = new MqttClient(this.deviceRegistry);
} }
cleanup() {
this.mqttClient.cleanup();
this.mqttClient.removeAllListeners();
this.plejdDeviceCommunication.cleanup();
this.plejdDeviceCommunication.removeAllListeners();
}
async init() { async init() {
logger.info('Main Plejd addon init()...'); logger.info('Main Plejd addon init()...');
await this.plejdApi.init(); await this.plejdApi.init();
this.sceneManager.init(); this.sceneManager.init();
this.processCleanupFunc = () => {
this.cleanup();
this.processCleanupFunc = () => {};
this.mqttClient.disconnect(() => process.exit(0));
};
['SIGINT', 'SIGHUP', 'SIGTERM'].forEach((signal) => { ['SIGINT', 'SIGHUP', 'SIGTERM'].forEach((signal) => {
process.on(signal, () => { process.on(signal, this.processCleanupFunc);
this.mqttClient.disconnect(() => process.exit(0));
});
}); });
this.mqttClient.on('connected', () => { this.mqttClient.on(MqttClient.EVENTS.connected, () => {
try { try {
logger.verbose('connected to mqtt.'); logger.verbose('connected to mqtt.');
this.mqttClient.sendDiscoveryToHomeAssistant(); this.mqttClient.sendDiscoveryToHomeAssistant();
@ -53,7 +65,7 @@ class PlejdAddon extends EventEmitter {
}); });
// subscribe to changes from HA // subscribe to changes from HA
this.mqttClient.on('stateChanged', (device, command) => { this.mqttClient.on(MqttClient.EVENTS.stateChanged, (device, command) => {
try { try {
const deviceId = device.id; const deviceId = device.id;
@ -99,21 +111,27 @@ class PlejdAddon extends EventEmitter {
this.mqttClient.init(); this.mqttClient.init();
// subscribe to changes from Plejd // subscribe to changes from Plejd
this.plejdDeviceCommunication.on('stateChanged', (deviceId, command) => { this.plejdDeviceCommunication.on(
try { PlejdDeviceCommunication.EVENTS.stateChanged,
this.mqttClient.updateState(deviceId, command); (deviceId, command) => {
} catch (err) { try {
logger.error('Error in PlejdService.stateChanged callback in main.js', err); this.mqttClient.updateState(deviceId, command);
} } catch (err) {
}); logger.error('Error in PlejdService.stateChanged callback in main.js', err);
}
},
);
this.plejdDeviceCommunication.on('sceneTriggered', (deviceId, sceneId) => { this.plejdDeviceCommunication.on(
try { PlejdDeviceCommunication.EVENTS.sceneTriggered,
this.mqttClient.sceneTriggered(sceneId); (deviceId, sceneId) => {
} catch (err) { try {
logger.error('Error in PlejdService.sceneTriggered callback in main.js', err); this.mqttClient.sceneTriggered(sceneId);
} } catch (err) {
}); logger.error('Error in PlejdService.sceneTriggered callback in main.js', err);
}
},
);
await this.plejdDeviceCommunication.init(); await this.plejdDeviceCommunication.init();
logger.info('Main init done'); logger.info('Main init done');

View file

@ -62,6 +62,8 @@ class PlejBLEHandler extends EventEmitter {
connected: 'connected', connected: 'connected',
reconnecting: 'reconnecting', reconnecting: 'reconnecting',
commandReceived: 'commandReceived', commandReceived: 'commandReceived',
writeFailed: 'writeFailed',
writeSuccess: 'writeSuccess',
}; };
constructor(deviceRegistry) { constructor(deviceRegistry) {
@ -80,14 +82,30 @@ class PlejBLEHandler extends EventEmitter {
auth: null, auth: null,
ping: null, ping: null,
}; };
}
this.on('writeFailed', (error) => this._onWriteFailed(error)); cleanup() {
this.on('writeSuccess', () => this._onWriteSuccess()); this.removeAllListeners(PlejBLEHandler.EVENTS.writeFailed);
this.removeAllListeners(PlejBLEHandler.EVENTS.writeSuccess);
if (this.bus) {
this.bus.removeAllListeners('error');
this.bus.removeAllListeners('connect');
}
if (this.characteristics.lastDataProperties) {
this.characteristics.lastDataProperties.removeAllListeners('PropertiesChanged');
}
if (this.objectManager) {
this.objectManager.removeAllListeners('InterfacesAdded');
}
} }
async init() { async init() {
logger.info('init()'); logger.info('init()');
this.on(PlejBLEHandler.EVENTS.writeFailed, (error) => this._onWriteFailed(error));
this.on(PlejBLEHandler.EVENTS.writeSuccess, () => this._onWriteSuccess());
this.bus = dbus.systemBus(); this.bus = dbus.systemBus();
this.bus.on('error', (err) => { this.bus.on('error', (err) => {
// Uncaught error events will show UnhandledPromiseRejection logs // Uncaught error events will show UnhandledPromiseRejection logs
@ -276,6 +294,7 @@ class PlejBLEHandler extends EventEmitter {
async _getInterface() { async _getInterface() {
const bluez = await this.bus.getProxyObject(BLUEZ_SERVICE_NAME, '/'); const bluez = await this.bus.getProxyObject(BLUEZ_SERVICE_NAME, '/');
this.objectManager = await bluez.getInterface(DBUS_OM_INTERFACE); this.objectManager = await bluez.getInterface(DBUS_OM_INTERFACE);
// We need to find the ble interface which implements the Adapter1 interface // We need to find the ble interface which implements the Adapter1 interface
@ -404,13 +423,12 @@ class PlejBLEHandler extends EventEmitter {
async _onInterfacesAdded(path, interfaces) { async _onInterfacesAdded(path, interfaces) {
logger.silly(`Interface added ${path}, inspecting...`); logger.silly(`Interface added ${path}, inspecting...`);
// const [adapter, dev, service, characteristic] = path.split('/').slice(3);
const interfaceKeys = Object.keys(interfaces); const interfaceKeys = Object.keys(interfaces);
if (interfaceKeys.indexOf(BLUEZ_DEVICE_ID) > -1) { if (interfaceKeys.indexOf(BLUEZ_DEVICE_ID) > -1) {
if (interfaces[BLUEZ_DEVICE_ID].UUIDs.value.indexOf(PLEJD_SERVICE) > -1) { if (interfaces[BLUEZ_DEVICE_ID].UUIDs.value.indexOf(PLEJD_SERVICE) > -1) {
logger.debug(`Found Plejd service on ${path}`); logger.debug(`Found Plejd service on ${path}`);
this.objectManager.removeAllListeners('InterfacesAdded');
await this._initDiscoveredPlejdDevice(path); await this._initDiscoveredPlejdDevice(path);
} else { } else {
logger.error('Uh oh, no Plejd device!'); logger.error('Uh oh, no Plejd device!');
@ -452,6 +470,7 @@ class PlejBLEHandler extends EventEmitter {
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
try { try {
this.cleanup();
await delay(5000); await delay(5000);
this.emit(PlejBLEHandler.EVENTS.reconnecting); this.emit(PlejBLEHandler.EVENTS.reconnecting);
logger.info('Reconnecting BLE...'); logger.info('Reconnecting BLE...');

View file

@ -30,22 +30,31 @@ class PlejdDeviceCommunication extends EventEmitter {
this.plejdBleHandler = new PlejBLEHandler(deviceRegistry); this.plejdBleHandler = new PlejBLEHandler(deviceRegistry);
this.config = Configuration.getOptions(); this.config = Configuration.getOptions();
this.deviceRegistry = deviceRegistry; this.deviceRegistry = deviceRegistry;
}
// eslint-disable-next-line max-len cleanup() {
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.commandReceived, (deviceId, command, data) => this._bleCommandReceived(deviceId, command, data)); this.plejdBleHandler.cleanup();
this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.commandReceived);
this.plejdBleHandler.on('connected', () => { this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.connected);
logger.info('Bluetooth connected. Plejd BLE up and running!'); this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.reconnecting);
this.startWriteQueue();
});
this.plejdBleHandler.on('reconnecting', () => {
logger.info('Bluetooth reconnecting...');
clearTimeout(this.writeQueueRef);
});
} }
async init() { async init() {
try { try {
// eslint-disable-next-line max-len
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.commandReceived, (deviceId, command, data) => this._bleCommandReceived(deviceId, command, data));
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.connected, () => {
logger.info('Bluetooth connected. Plejd BLE up and running!');
logger.verbose('Starting writeQueue loop.');
this.startWriteQueue();
});
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.reconnecting, () => {
logger.info('Bluetooth reconnecting...');
logger.verbose('Stopping writeQueue loop until connection is established.');
clearTimeout(this.writeQueueRef);
});
await this.plejdBleHandler.init(); await this.plejdBleHandler.init();
} catch (err) { } catch (err) {
logger.error('Failed init() of BLE. Starting reconnect loop.'); logger.error('Failed init() of BLE. Starting reconnect loop.');
@ -108,9 +117,8 @@ class PlejdDeviceCommunication extends EventEmitter {
} }
_transitionTo(deviceId, targetBrightness, transition, deviceName) { _transitionTo(deviceId, targetBrightness, transition, deviceName) {
const initialBrightness = this.plejdDevices[deviceId] const device = this.deviceRegistry.getDevice(deviceId);
? this.plejdDevices[deviceId].state && this.plejdDevices[deviceId].dim const initialBrightness = device ? device.state && device.dim : null;
: null;
this._clearDeviceTransitionTimer(deviceId); this._clearDeviceTransitionTimer(deviceId);
const isDimmable = this.deviceRegistry.getDevice(deviceId).dimmable; const isDimmable = this.deviceRegistry.getDevice(deviceId).dimmable;

View file

@ -1,16 +1,13 @@
const EventEmitter = require('events');
const Logger = require('./Logger'); const Logger = require('./Logger');
const Scene = require('./Scene'); const Scene = require('./Scene');
const logger = Logger.getLogger('scene-manager'); const logger = Logger.getLogger('scene-manager');
class SceneManager extends EventEmitter { class SceneManager {
deviceRegistry; deviceRegistry;
plejdBle; plejdBle;
scenes; scenes;
constructor(deviceRegistry, plejdBle) { constructor(deviceRegistry, plejdBle) {
super();
this.deviceRegistry = deviceRegistry; this.deviceRegistry = deviceRegistry;
this.plejdBle = plejdBle; this.plejdBle = plejdBle;
this.scenes = {}; this.scenes = {};