From 4d5a4ddbe350cc8e3c0ea65853a15c2aec9c21dc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Victor=20Hagelb=C3=A4ck?= Date: Mon, 22 Feb 2021 09:50:06 +0100 Subject: [PATCH] Stop write queue when BLE is not connected to avoid loosing commands - Set max retry count to 10 since BLE loop will set channel to disconnected after 5 retries (or known unrecoverable errors) --- plejd/DeviceRegistry.js | 8 +- plejd/MqttClient.js | 122 +++++++++++++++--------------- plejd/PlejdAddon.js | 8 +- plejd/PlejdBLEHandler.js | 14 ++-- plejd/PlejdDeviceCommunication.js | 28 ++++--- 5 files changed, 99 insertions(+), 81 deletions(-) diff --git a/plejd/DeviceRegistry.js b/plejd/DeviceRegistry.js index e9c45c8..a3612ac 100644 --- a/plejd/DeviceRegistry.js +++ b/plejd/DeviceRegistry.js @@ -41,10 +41,10 @@ class DeviceRegistry { ); if (added.roomId) { - this.deviceIdsByRoom[added.roomId] = [ - ...(this.deviceIdsByRoom[added.roomId] || []), - added.id, - ]; + const room = this.deviceIdsByRoom[added.roomId] || []; + if (!room.includes(added.roomId)) { + this.deviceIdsByRoom[added.roomId] = [...room, added.roomId]; + } logger.verbose( `Added to room #${added.roomId}: ${JSON.stringify(this.deviceIdsByRoom[added.roomId])}`, ); diff --git a/plejd/MqttClient.js b/plejd/MqttClient.js index 3571951..0369010 100644 --- a/plejd/MqttClient.js +++ b/plejd/MqttClient.js @@ -99,7 +99,7 @@ class MqttClient extends EventEmitter { this.client.subscribe(startTopics, (err) => { if (err) { - logger.error('Unable to subscribe to status topics'); + logger.error('Unable to subscribe to status topics', err); } this.emit(MqttClient.EVENTS.connected); @@ -118,66 +118,70 @@ class MqttClient extends EventEmitter { }); this.client.on('message', (topic, message) => { - if (startTopics.includes(topic)) { - logger.info('Home Assistant has started. lets do discovery.'); - this.emit(MqttClient.EVENTS.connected); - } else { - const decodedTopic = decodeTopic(topic); - if (decodedTopic) { - let device = this.deviceRegistry.getDevice(decodedTopic.id); - - const messageString = message.toString(); - const isJsonMessage = messageString.startsWith('{'); - const command = isJsonMessage ? JSON.parse(messageString) : messageString; - - if ( - !isJsonMessage - && messageString === 'ON' - && this.deviceRegistry.getScene(decodedTopic.id) - ) { - // Guess that id that got state command without dim value belongs to Scene, not Device - // This guess could very well be wrong depending on the installation... - logger.warn( - `Device id ${decodedTopic.id} belongs to both scene and device, guessing Scene is what should be set to ON. ` - + 'OFF commands still sent to device.', - ); - device = this.deviceRegistry.getScene(decodedTopic.id); - } - const deviceName = device ? device.name : ''; - - switch (decodedTopic.command) { - case 'set': - logger.verbose( - `Got mqtt SET command for ${decodedTopic.type}, ${deviceName} (${decodedTopic.id}): ${messageString}`, - ); - - if (device) { - this.emit(MqttClient.EVENTS.stateChanged, device, command); - } else { - logger.warn( - `Device for topic ${topic} not found! Can happen if HA calls previously existing devices.`, - ); - } - break; - case 'state': - case 'config': - case 'availability': - logger.verbose( - `Sent mqtt ${decodedTopic.command} command for ${ - decodedTopic.type - }, ${deviceName} (${decodedTopic.id}). ${ - decodedTopic.command === 'availability' ? messageString : '' - }`, - ); - break; - default: - logger.verbose(`Warning: Unknown command ${decodedTopic.command} in decoded topic`); - } + try { + if (startTopics.includes(topic)) { + logger.info('Home Assistant has started. lets do discovery.'); + this.emit(MqttClient.EVENTS.connected); } else { - logger.verbose( - `Warning: Got unrecognized mqtt command on '${topic}': ${message.toString()}`, - ); + const decodedTopic = decodeTopic(topic); + if (decodedTopic) { + let device = this.deviceRegistry.getDevice(decodedTopic.id); + + const messageString = message.toString(); + const isJsonMessage = messageString.startsWith('{'); + const command = isJsonMessage ? JSON.parse(messageString) : messageString; + + if ( + !isJsonMessage + && messageString === 'ON' + && this.deviceRegistry.getScene(decodedTopic.id) + ) { + // Guess that id that got state command without dim value belongs to Scene, not Device + // This guess could very well be wrong depending on the installation... + logger.warn( + `Device id ${decodedTopic.id} belongs to both scene and device, guessing Scene is what should be set to ON. ` + + 'OFF commands still sent to device.', + ); + device = this.deviceRegistry.getScene(decodedTopic.id); + } + const deviceName = device ? device.name : ''; + + switch (decodedTopic.command) { + case 'set': + logger.verbose( + `Got mqtt SET command for ${decodedTopic.type}, ${deviceName} (${decodedTopic.id}): ${messageString}`, + ); + + if (device) { + this.emit(MqttClient.EVENTS.stateChanged, device, command); + } else { + logger.warn( + `Device for topic ${topic} not found! Can happen if HA calls previously existing devices.`, + ); + } + break; + case 'state': + case 'config': + case 'availability': + logger.verbose( + `Sent mqtt ${decodedTopic.command} command for ${ + decodedTopic.type + }, ${deviceName} (${decodedTopic.id}). ${ + decodedTopic.command === 'availability' ? messageString : '' + }`, + ); + break; + default: + logger.verbose(`Warning: Unknown command ${decodedTopic.command} in decoded topic`); + } + } else { + logger.verbose( + `Warning: Got unrecognized mqtt command on '${topic}': ${message.toString()}`, + ); + } } + } catch (err) { + logger.error(`Error processing mqtt message on topic ${topic}`, err); } }); } diff --git a/plejd/PlejdAddon.js b/plejd/PlejdAddon.js index 0f3c6f2..196d986 100644 --- a/plejd/PlejdAddon.js +++ b/plejd/PlejdAddon.js @@ -104,7 +104,7 @@ class PlejdAddon extends EventEmitter { this.plejdDeviceCommunication.turnOff(deviceId, commandObj); } } catch (err) { - logger.error('Error in MqttClient.stateChanged callback in main.js', err); + logger.error('Error in MqttClient.stateChanged callback', err); } }); @@ -117,18 +117,18 @@ class PlejdAddon extends EventEmitter { try { this.mqttClient.updateState(deviceId, command); } catch (err) { - logger.error('Error in PlejdService.stateChanged callback in main.js', err); + logger.error('Error in PlejdService.stateChanged callback', err); } }, ); this.plejdDeviceCommunication.on( PlejdDeviceCommunication.EVENTS.sceneTriggered, - (deviceId, sceneId) => { + (sceneId) => { try { this.mqttClient.sceneTriggered(sceneId); } catch (err) { - logger.error('Error in PlejdService.sceneTriggered callback in main.js', err); + logger.error('Error in PlejdService.sceneTriggered callback', err); } }, ); diff --git a/plejd/PlejdBLEHandler.js b/plejd/PlejdBLEHandler.js index c6ea2d8..05375d8 100644 --- a/plejd/PlejdBLEHandler.js +++ b/plejd/PlejdBLEHandler.js @@ -85,6 +85,12 @@ class PlejBLEHandler extends EventEmitter { } cleanup() { + logger.verbose('cleanup() - Clearing ping interval and clock update timer'); + clearInterval(this.pingRef); + clearTimeout(this.requestCurrentPlejdTimeRef); + + logger.verbose('Removing listeners to write events, bus events and objectManager...'); + this.removeAllListeners(PlejBLEHandler.EVENTS.writeFailed); this.removeAllListeners(PlejBLEHandler.EVENTS.writeSuccess); @@ -456,23 +462,21 @@ class PlejBLEHandler extends EventEmitter { } async startReconnectPeriodicallyLoop() { - logger.verbose('startReconnectPeriodicallyLoop'); + logger.info('Starting reconnect loop...'); if (this.reconnectInProgress) { logger.debug('Reconnect already in progress. Skipping this call.'); return; } - clearInterval(this.pingRef); - clearTimeout(this.writeQueueRef); - clearTimeout(this.requestCurrentPlejdTimeRef); this.reconnectInProgress = true; /* eslint-disable no-await-in-loop */ // eslint-disable-next-line no-constant-condition while (true) { try { + logger.verbose('Reconnect: Clean up, emit reconnect event, wait 5s and the re-init...'); this.cleanup(); - await delay(5000); this.emit(PlejBLEHandler.EVENTS.reconnecting); + await delay(5000); logger.info('Reconnecting BLE...'); await this.init(); break; diff --git a/plejd/PlejdDeviceCommunication.js b/plejd/PlejdDeviceCommunication.js index 6497e9b..4fafb6c 100644 --- a/plejd/PlejdDeviceCommunication.js +++ b/plejd/PlejdDeviceCommunication.js @@ -8,9 +8,10 @@ const { COMMANDS } = constants; const logger = Logger.getLogger('device-comm'); const MAX_TRANSITION_STEPS_PER_SECOND = 5; // Could be made a setting -const MAX_RETRY_COUNT = 5; // Could be made a setting +const MAX_RETRY_COUNT = 10; // Could be made a setting class PlejdDeviceCommunication extends EventEmitter { + bleConnected; bleDeviceTransitionTimers = {}; plejdBleHandler; config; @@ -33,6 +34,7 @@ class PlejdDeviceCommunication extends EventEmitter { } cleanup() { + Object.values(this.bleDeviceTransitionTimers).forEach((t) => clearTimeout(t)); this.plejdBleHandler.cleanup(); this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.commandReceived); this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.connected); @@ -41,17 +43,21 @@ class PlejdDeviceCommunication extends EventEmitter { async init() { try { + this.cleanup(); + this.bleConnected = false; // eslint-disable-next-line max-len this.plejdBleHandler.on(PlejBLEHandler.EVENTS.commandReceived, (deviceId, command, data) => this._bleCommandReceived(deviceId, command, data)); this.plejdBleHandler.on(PlejBLEHandler.EVENTS.connected, () => { logger.info('Bluetooth connected. Plejd BLE up and running!'); - logger.verbose('Starting writeQueue loop.'); - this.startWriteQueue(); + logger.verbose(`Starting writeQueue loop. Write queue length: ${this.writeQueue.length}`); + this.bleConnected = true; + this._startWriteQueue(); }); this.plejdBleHandler.on(PlejBLEHandler.EVENTS.reconnecting, () => { logger.info('Bluetooth reconnecting...'); - logger.verbose('Stopping writeQueue loop until connection is established.'); + logger.verbose(`Stopping writeQueue loop until connection is established. Write queue length: ${this.writeQueue.length}`); + this.bleConnected = false; clearTimeout(this.writeQueueRef); }); @@ -101,7 +107,7 @@ class PlejdDeviceCommunication extends EventEmitter { state: 0, }); } else if (command === COMMANDS.TRIGGER_SCENE) { - this.emit(PlejdDeviceCommunication.EVENTS.sceneTriggered, deviceId, data.sceneId); + this.emit(PlejdDeviceCommunication.EVENTS.sceneTriggered, data.sceneId); } else { logger.warn(`Unknown ble command ${command}`); } @@ -226,16 +232,20 @@ class PlejdDeviceCommunication extends EventEmitter { }); } - startWriteQueue() { + _startWriteQueue() { logger.info('startWriteQueue()'); clearTimeout(this.writeQueueRef); - this.writeQueueRef = setTimeout(() => this.runWriteQueue(), this.config.writeQueueWaitTime); + this.writeQueueRef = setTimeout(() => this._runWriteQueue(), this.config.writeQueueWaitTime); } - async runWriteQueue() { + async _runWriteQueue() { try { while (this.writeQueue.length > 0) { + if (!this.bleConnected) { + logger.warn('BLE not connected, stopping write queue until connection is up again.'); + return; + } const queueItem = this.writeQueue.pop(); const deviceName = this.deviceRegistry.getDeviceName(queueItem.deviceId); logger.debug( @@ -285,7 +295,7 @@ class PlejdDeviceCommunication extends EventEmitter { logger.error('Error in writeQueue loop, values probably not written to Plejd', e); } - this.writeQueueRef = setTimeout(() => this.runWriteQueue(), this.config.writeQueueWaitTime); + this.writeQueueRef = setTimeout(() => this._runWriteQueue(), this.config.writeQueueWaitTime); } }