Stop write queue when BLE is not connected to avoid loosing commands

- Set max retry count to 10 since BLE loop will set channel to disconnected after 5 retries (or known unrecoverable errors)
This commit is contained in:
Victor Hagelbäck 2021-02-22 09:50:06 +01:00
parent e8ab13aed3
commit 4d5a4ddbe3
5 changed files with 99 additions and 81 deletions

View file

@ -41,10 +41,10 @@ class DeviceRegistry {
); );
if (added.roomId) { if (added.roomId) {
this.deviceIdsByRoom[added.roomId] = [ const room = this.deviceIdsByRoom[added.roomId] || [];
...(this.deviceIdsByRoom[added.roomId] || []), if (!room.includes(added.roomId)) {
added.id, this.deviceIdsByRoom[added.roomId] = [...room, added.roomId];
]; }
logger.verbose( logger.verbose(
`Added to room #${added.roomId}: ${JSON.stringify(this.deviceIdsByRoom[added.roomId])}`, `Added to room #${added.roomId}: ${JSON.stringify(this.deviceIdsByRoom[added.roomId])}`,
); );

View file

@ -99,7 +99,7 @@ class MqttClient extends EventEmitter {
this.client.subscribe(startTopics, (err) => { this.client.subscribe(startTopics, (err) => {
if (err) { if (err) {
logger.error('Unable to subscribe to status topics'); logger.error('Unable to subscribe to status topics', err);
} }
this.emit(MqttClient.EVENTS.connected); this.emit(MqttClient.EVENTS.connected);
@ -118,6 +118,7 @@ class MqttClient extends EventEmitter {
}); });
this.client.on('message', (topic, message) => { this.client.on('message', (topic, message) => {
try {
if (startTopics.includes(topic)) { if (startTopics.includes(topic)) {
logger.info('Home Assistant has started. lets do discovery.'); logger.info('Home Assistant has started. lets do discovery.');
this.emit(MqttClient.EVENTS.connected); this.emit(MqttClient.EVENTS.connected);
@ -179,6 +180,9 @@ class MqttClient extends EventEmitter {
); );
} }
} }
} catch (err) {
logger.error(`Error processing mqtt message on topic ${topic}`, err);
}
}); });
} }

View file

@ -104,7 +104,7 @@ class PlejdAddon extends EventEmitter {
this.plejdDeviceCommunication.turnOff(deviceId, commandObj); this.plejdDeviceCommunication.turnOff(deviceId, commandObj);
} }
} catch (err) { } catch (err) {
logger.error('Error in MqttClient.stateChanged callback in main.js', err); logger.error('Error in MqttClient.stateChanged callback', err);
} }
}); });
@ -117,18 +117,18 @@ class PlejdAddon extends EventEmitter {
try { try {
this.mqttClient.updateState(deviceId, command); this.mqttClient.updateState(deviceId, command);
} catch (err) { } catch (err) {
logger.error('Error in PlejdService.stateChanged callback in main.js', err); logger.error('Error in PlejdService.stateChanged callback', err);
} }
}, },
); );
this.plejdDeviceCommunication.on( this.plejdDeviceCommunication.on(
PlejdDeviceCommunication.EVENTS.sceneTriggered, PlejdDeviceCommunication.EVENTS.sceneTriggered,
(deviceId, sceneId) => { (sceneId) => {
try { try {
this.mqttClient.sceneTriggered(sceneId); this.mqttClient.sceneTriggered(sceneId);
} catch (err) { } catch (err) {
logger.error('Error in PlejdService.sceneTriggered callback in main.js', err); logger.error('Error in PlejdService.sceneTriggered callback', err);
} }
}, },
); );

View file

@ -85,6 +85,12 @@ class PlejBLEHandler extends EventEmitter {
} }
cleanup() { cleanup() {
logger.verbose('cleanup() - Clearing ping interval and clock update timer');
clearInterval(this.pingRef);
clearTimeout(this.requestCurrentPlejdTimeRef);
logger.verbose('Removing listeners to write events, bus events and objectManager...');
this.removeAllListeners(PlejBLEHandler.EVENTS.writeFailed); this.removeAllListeners(PlejBLEHandler.EVENTS.writeFailed);
this.removeAllListeners(PlejBLEHandler.EVENTS.writeSuccess); this.removeAllListeners(PlejBLEHandler.EVENTS.writeSuccess);
@ -456,23 +462,21 @@ class PlejBLEHandler extends EventEmitter {
} }
async startReconnectPeriodicallyLoop() { async startReconnectPeriodicallyLoop() {
logger.verbose('startReconnectPeriodicallyLoop'); logger.info('Starting reconnect loop...');
if (this.reconnectInProgress) { if (this.reconnectInProgress) {
logger.debug('Reconnect already in progress. Skipping this call.'); logger.debug('Reconnect already in progress. Skipping this call.');
return; return;
} }
clearInterval(this.pingRef);
clearTimeout(this.writeQueueRef);
clearTimeout(this.requestCurrentPlejdTimeRef);
this.reconnectInProgress = true; this.reconnectInProgress = true;
/* eslint-disable no-await-in-loop */ /* eslint-disable no-await-in-loop */
// eslint-disable-next-line no-constant-condition // eslint-disable-next-line no-constant-condition
while (true) { while (true) {
try { try {
logger.verbose('Reconnect: Clean up, emit reconnect event, wait 5s and the re-init...');
this.cleanup(); this.cleanup();
await delay(5000);
this.emit(PlejBLEHandler.EVENTS.reconnecting); this.emit(PlejBLEHandler.EVENTS.reconnecting);
await delay(5000);
logger.info('Reconnecting BLE...'); logger.info('Reconnecting BLE...');
await this.init(); await this.init();
break; break;

View file

@ -8,9 +8,10 @@ const { COMMANDS } = constants;
const logger = Logger.getLogger('device-comm'); const logger = Logger.getLogger('device-comm');
const MAX_TRANSITION_STEPS_PER_SECOND = 5; // Could be made a setting const MAX_TRANSITION_STEPS_PER_SECOND = 5; // Could be made a setting
const MAX_RETRY_COUNT = 5; // Could be made a setting const MAX_RETRY_COUNT = 10; // Could be made a setting
class PlejdDeviceCommunication extends EventEmitter { class PlejdDeviceCommunication extends EventEmitter {
bleConnected;
bleDeviceTransitionTimers = {}; bleDeviceTransitionTimers = {};
plejdBleHandler; plejdBleHandler;
config; config;
@ -33,6 +34,7 @@ class PlejdDeviceCommunication extends EventEmitter {
} }
cleanup() { cleanup() {
Object.values(this.bleDeviceTransitionTimers).forEach((t) => clearTimeout(t));
this.plejdBleHandler.cleanup(); this.plejdBleHandler.cleanup();
this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.commandReceived); this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.commandReceived);
this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.connected); this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.connected);
@ -41,17 +43,21 @@ class PlejdDeviceCommunication extends EventEmitter {
async init() { async init() {
try { try {
this.cleanup();
this.bleConnected = false;
// eslint-disable-next-line max-len // eslint-disable-next-line max-len
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.commandReceived, (deviceId, command, data) => this._bleCommandReceived(deviceId, command, data)); this.plejdBleHandler.on(PlejBLEHandler.EVENTS.commandReceived, (deviceId, command, data) => this._bleCommandReceived(deviceId, command, data));
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.connected, () => { this.plejdBleHandler.on(PlejBLEHandler.EVENTS.connected, () => {
logger.info('Bluetooth connected. Plejd BLE up and running!'); logger.info('Bluetooth connected. Plejd BLE up and running!');
logger.verbose('Starting writeQueue loop.'); logger.verbose(`Starting writeQueue loop. Write queue length: ${this.writeQueue.length}`);
this.startWriteQueue(); this.bleConnected = true;
this._startWriteQueue();
}); });
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.reconnecting, () => { this.plejdBleHandler.on(PlejBLEHandler.EVENTS.reconnecting, () => {
logger.info('Bluetooth reconnecting...'); logger.info('Bluetooth reconnecting...');
logger.verbose('Stopping writeQueue loop until connection is established.'); logger.verbose(`Stopping writeQueue loop until connection is established. Write queue length: ${this.writeQueue.length}`);
this.bleConnected = false;
clearTimeout(this.writeQueueRef); clearTimeout(this.writeQueueRef);
}); });
@ -101,7 +107,7 @@ class PlejdDeviceCommunication extends EventEmitter {
state: 0, state: 0,
}); });
} else if (command === COMMANDS.TRIGGER_SCENE) { } else if (command === COMMANDS.TRIGGER_SCENE) {
this.emit(PlejdDeviceCommunication.EVENTS.sceneTriggered, deviceId, data.sceneId); this.emit(PlejdDeviceCommunication.EVENTS.sceneTriggered, data.sceneId);
} else { } else {
logger.warn(`Unknown ble command ${command}`); logger.warn(`Unknown ble command ${command}`);
} }
@ -226,16 +232,20 @@ class PlejdDeviceCommunication extends EventEmitter {
}); });
} }
startWriteQueue() { _startWriteQueue() {
logger.info('startWriteQueue()'); logger.info('startWriteQueue()');
clearTimeout(this.writeQueueRef); clearTimeout(this.writeQueueRef);
this.writeQueueRef = setTimeout(() => this.runWriteQueue(), this.config.writeQueueWaitTime); this.writeQueueRef = setTimeout(() => this._runWriteQueue(), this.config.writeQueueWaitTime);
} }
async runWriteQueue() { async _runWriteQueue() {
try { try {
while (this.writeQueue.length > 0) { while (this.writeQueue.length > 0) {
if (!this.bleConnected) {
logger.warn('BLE not connected, stopping write queue until connection is up again.');
return;
}
const queueItem = this.writeQueue.pop(); const queueItem = this.writeQueue.pop();
const deviceName = this.deviceRegistry.getDeviceName(queueItem.deviceId); const deviceName = this.deviceRegistry.getDeviceName(queueItem.deviceId);
logger.debug( logger.debug(
@ -285,7 +295,7 @@ class PlejdDeviceCommunication extends EventEmitter {
logger.error('Error in writeQueue loop, values probably not written to Plejd', e); logger.error('Error in writeQueue loop, values probably not written to Plejd', e);
} }
this.writeQueueRef = setTimeout(() => this.runWriteQueue(), this.config.writeQueueWaitTime); this.writeQueueRef = setTimeout(() => this._runWriteQueue(), this.config.writeQueueWaitTime);
} }
} }