Stop write queue when BLE is not connected to avoid loosing commands
- Set max retry count to 10 since BLE loop will set channel to disconnected after 5 retries (or known unrecoverable errors)
This commit is contained in:
parent
4591af57b9
commit
e7b8a5a82a
5 changed files with 99 additions and 82 deletions
|
|
@ -41,10 +41,10 @@ class DeviceRegistry {
|
||||||
);
|
);
|
||||||
|
|
||||||
if (added.roomId) {
|
if (added.roomId) {
|
||||||
this.deviceIdsByRoom[added.roomId] = [
|
const room = this.deviceIdsByRoom[added.roomId] || [];
|
||||||
...(this.deviceIdsByRoom[added.roomId] || []),
|
if (!room.includes(added.roomId)) {
|
||||||
added.id,
|
this.deviceIdsByRoom[added.roomId] = [...room, added.roomId];
|
||||||
];
|
}
|
||||||
logger.verbose(
|
logger.verbose(
|
||||||
`Added to room #${added.roomId}: ${JSON.stringify(this.deviceIdsByRoom[added.roomId])}`,
|
`Added to room #${added.roomId}: ${JSON.stringify(this.deviceIdsByRoom[added.roomId])}`,
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -99,7 +99,7 @@ class MqttClient extends EventEmitter {
|
||||||
|
|
||||||
this.client.subscribe(startTopics, (err) => {
|
this.client.subscribe(startTopics, (err) => {
|
||||||
if (err) {
|
if (err) {
|
||||||
logger.error('Unable to subscribe to status topics');
|
logger.error('Unable to subscribe to status topics', err);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.emit(MqttClient.EVENTS.connected);
|
this.emit(MqttClient.EVENTS.connected);
|
||||||
|
|
@ -118,67 +118,70 @@ class MqttClient extends EventEmitter {
|
||||||
});
|
});
|
||||||
|
|
||||||
this.client.on('message', (topic, message) => {
|
this.client.on('message', (topic, message) => {
|
||||||
if (startTopics.includes(topic)) {
|
try {
|
||||||
logger.info('Home Assistant has started. lets do discovery.');
|
if (startTopics.includes(topic)) {
|
||||||
this.emit(MqttClient.EVENTS.connected);
|
logger.info('Home Assistant has started. lets do discovery.');
|
||||||
} else {
|
this.emit(MqttClient.EVENTS.connected);
|
||||||
const decodedTopic = decodeTopic(topic);
|
|
||||||
if (decodedTopic) {
|
|
||||||
let device = this.deviceRegistry.getDevice(decodedTopic.id);
|
|
||||||
|
|
||||||
const messageString = message.toString();
|
|
||||||
const isJsonMessage = messageString.startsWith('{');
|
|
||||||
const command = isJsonMessage ? JSON.parse(messageString) : messageString;
|
|
||||||
|
|
||||||
if (
|
|
||||||
!isJsonMessage
|
|
||||||
&& messageString === 'ON'
|
|
||||||
&& this.deviceRegistry.getScene(decodedTopic.id)
|
|
||||||
) {
|
|
||||||
// Guess that id that got state command without dim value belongs to Scene, not Device
|
|
||||||
// This guess could very well be wrong depending on the installation...
|
|
||||||
logger.warn(
|
|
||||||
`Device id ${decodedTopic.id} belongs to both scene and device, guessing Scene is what should be set to ON. `
|
|
||||||
+ 'OFF commands still sent to device.',
|
|
||||||
);
|
|
||||||
device = this.deviceRegistry.getScene(decodedTopic.id);
|
|
||||||
}
|
|
||||||
|
|
||||||
const deviceName = device ? device.name : '';
|
|
||||||
|
|
||||||
switch (decodedTopic.command) {
|
|
||||||
case 'set':
|
|
||||||
logger.verbose(
|
|
||||||
`Got mqtt SET command for ${decodedTopic.type}, ${deviceName} (${decodedTopic.id}): ${messageString}`,
|
|
||||||
);
|
|
||||||
|
|
||||||
if (device) {
|
|
||||||
this.emit(MqttClient.EVENTS.stateChanged, device, command);
|
|
||||||
} else {
|
|
||||||
logger.warn(
|
|
||||||
`Device for topic ${topic} not found! Can happen if HA calls previously existing devices.`,
|
|
||||||
);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
case 'state':
|
|
||||||
case 'config':
|
|
||||||
case 'availability':
|
|
||||||
logger.verbose(
|
|
||||||
`Sent mqtt ${decodedTopic.command} command for ${
|
|
||||||
decodedTopic.type
|
|
||||||
}, ${deviceName} (${decodedTopic.id}). ${
|
|
||||||
decodedTopic.command === 'availability' ? messageString : ''
|
|
||||||
}`,
|
|
||||||
);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
logger.verbose(`Warning: Unknown command ${decodedTopic.command} in decoded topic`);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
logger.verbose(
|
const decodedTopic = decodeTopic(topic);
|
||||||
`Warning: Got unrecognized mqtt command on '${topic}': ${message.toString()}`,
|
if (decodedTopic) {
|
||||||
);
|
let device = this.deviceRegistry.getDevice(decodedTopic.id);
|
||||||
|
|
||||||
|
const messageString = message.toString();
|
||||||
|
const isJsonMessage = messageString.startsWith('{');
|
||||||
|
const command = isJsonMessage ? JSON.parse(messageString) : messageString;
|
||||||
|
|
||||||
|
if (
|
||||||
|
!isJsonMessage
|
||||||
|
&& messageString === 'ON'
|
||||||
|
&& this.deviceRegistry.getScene(decodedTopic.id)
|
||||||
|
) {
|
||||||
|
// Guess that id that got state command without dim value belongs to Scene, not Device
|
||||||
|
// This guess could very well be wrong depending on the installation...
|
||||||
|
logger.warn(
|
||||||
|
`Device id ${decodedTopic.id} belongs to both scene and device, guessing Scene is what should be set to ON. `
|
||||||
|
+ 'OFF commands still sent to device.',
|
||||||
|
);
|
||||||
|
device = this.deviceRegistry.getScene(decodedTopic.id);
|
||||||
|
}
|
||||||
|
const deviceName = device ? device.name : '';
|
||||||
|
|
||||||
|
switch (decodedTopic.command) {
|
||||||
|
case 'set':
|
||||||
|
logger.verbose(
|
||||||
|
`Got mqtt SET command for ${decodedTopic.type}, ${deviceName} (${decodedTopic.id}): ${messageString}`,
|
||||||
|
);
|
||||||
|
|
||||||
|
if (device) {
|
||||||
|
this.emit(MqttClient.EVENTS.stateChanged, device, command);
|
||||||
|
} else {
|
||||||
|
logger.warn(
|
||||||
|
`Device for topic ${topic} not found! Can happen if HA calls previously existing devices.`,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case 'state':
|
||||||
|
case 'config':
|
||||||
|
case 'availability':
|
||||||
|
logger.verbose(
|
||||||
|
`Sent mqtt ${decodedTopic.command} command for ${
|
||||||
|
decodedTopic.type
|
||||||
|
}, ${deviceName} (${decodedTopic.id}). ${
|
||||||
|
decodedTopic.command === 'availability' ? messageString : ''
|
||||||
|
}`,
|
||||||
|
);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
logger.verbose(`Warning: Unknown command ${decodedTopic.command} in decoded topic`);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
logger.verbose(
|
||||||
|
`Warning: Got unrecognized mqtt command on '${topic}': ${message.toString()}`,
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
} catch (err) {
|
||||||
|
logger.error(`Error processing mqtt message on topic ${topic}`, err);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -104,7 +104,7 @@ class PlejdAddon extends EventEmitter {
|
||||||
this.plejdDeviceCommunication.turnOff(deviceId, commandObj);
|
this.plejdDeviceCommunication.turnOff(deviceId, commandObj);
|
||||||
}
|
}
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error('Error in MqttClient.stateChanged callback in main.js', err);
|
logger.error('Error in MqttClient.stateChanged callback', err);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -117,18 +117,18 @@ class PlejdAddon extends EventEmitter {
|
||||||
try {
|
try {
|
||||||
this.mqttClient.updateState(deviceId, command);
|
this.mqttClient.updateState(deviceId, command);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error('Error in PlejdService.stateChanged callback in main.js', err);
|
logger.error('Error in PlejdService.stateChanged callback', err);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
this.plejdDeviceCommunication.on(
|
this.plejdDeviceCommunication.on(
|
||||||
PlejdDeviceCommunication.EVENTS.sceneTriggered,
|
PlejdDeviceCommunication.EVENTS.sceneTriggered,
|
||||||
(deviceId, sceneId) => {
|
(sceneId) => {
|
||||||
try {
|
try {
|
||||||
this.mqttClient.sceneTriggered(sceneId);
|
this.mqttClient.sceneTriggered(sceneId);
|
||||||
} catch (err) {
|
} catch (err) {
|
||||||
logger.error('Error in PlejdService.sceneTriggered callback in main.js', err);
|
logger.error('Error in PlejdService.sceneTriggered callback', err);
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
|
||||||
|
|
@ -85,6 +85,12 @@ class PlejBLEHandler extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup() {
|
cleanup() {
|
||||||
|
logger.verbose('cleanup() - Clearing ping interval and clock update timer');
|
||||||
|
clearInterval(this.pingRef);
|
||||||
|
clearTimeout(this.requestCurrentPlejdTimeRef);
|
||||||
|
|
||||||
|
logger.verbose('Removing listeners to write events, bus events and objectManager...');
|
||||||
|
|
||||||
this.removeAllListeners(PlejBLEHandler.EVENTS.writeFailed);
|
this.removeAllListeners(PlejBLEHandler.EVENTS.writeFailed);
|
||||||
this.removeAllListeners(PlejBLEHandler.EVENTS.writeSuccess);
|
this.removeAllListeners(PlejBLEHandler.EVENTS.writeSuccess);
|
||||||
|
|
||||||
|
|
@ -456,23 +462,21 @@ class PlejBLEHandler extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
async startReconnectPeriodicallyLoop() {
|
async startReconnectPeriodicallyLoop() {
|
||||||
logger.verbose('startReconnectPeriodicallyLoop');
|
logger.info('Starting reconnect loop...');
|
||||||
if (this.reconnectInProgress) {
|
if (this.reconnectInProgress) {
|
||||||
logger.debug('Reconnect already in progress. Skipping this call.');
|
logger.debug('Reconnect already in progress. Skipping this call.');
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
clearInterval(this.pingRef);
|
|
||||||
clearTimeout(this.writeQueueRef);
|
|
||||||
clearTimeout(this.requestCurrentPlejdTimeRef);
|
|
||||||
this.reconnectInProgress = true;
|
this.reconnectInProgress = true;
|
||||||
|
|
||||||
/* eslint-disable no-await-in-loop */
|
/* eslint-disable no-await-in-loop */
|
||||||
// eslint-disable-next-line no-constant-condition
|
// eslint-disable-next-line no-constant-condition
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
|
logger.verbose('Reconnect: Clean up, emit reconnect event, wait 5s and the re-init...');
|
||||||
this.cleanup();
|
this.cleanup();
|
||||||
await delay(5000);
|
|
||||||
this.emit(PlejBLEHandler.EVENTS.reconnecting);
|
this.emit(PlejBLEHandler.EVENTS.reconnecting);
|
||||||
|
await delay(5000);
|
||||||
logger.info('Reconnecting BLE...');
|
logger.info('Reconnecting BLE...');
|
||||||
await this.init();
|
await this.init();
|
||||||
break;
|
break;
|
||||||
|
|
|
||||||
|
|
@ -8,9 +8,10 @@ const { COMMANDS } = constants;
|
||||||
const logger = Logger.getLogger('device-comm');
|
const logger = Logger.getLogger('device-comm');
|
||||||
|
|
||||||
const MAX_TRANSITION_STEPS_PER_SECOND = 5; // Could be made a setting
|
const MAX_TRANSITION_STEPS_PER_SECOND = 5; // Could be made a setting
|
||||||
const MAX_RETRY_COUNT = 5; // Could be made a setting
|
const MAX_RETRY_COUNT = 10; // Could be made a setting
|
||||||
|
|
||||||
class PlejdDeviceCommunication extends EventEmitter {
|
class PlejdDeviceCommunication extends EventEmitter {
|
||||||
|
bleConnected;
|
||||||
bleDeviceTransitionTimers = {};
|
bleDeviceTransitionTimers = {};
|
||||||
plejdBleHandler;
|
plejdBleHandler;
|
||||||
config;
|
config;
|
||||||
|
|
@ -33,6 +34,7 @@ class PlejdDeviceCommunication extends EventEmitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
cleanup() {
|
cleanup() {
|
||||||
|
Object.values(this.bleDeviceTransitionTimers).forEach((t) => clearTimeout(t));
|
||||||
this.plejdBleHandler.cleanup();
|
this.plejdBleHandler.cleanup();
|
||||||
this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.commandReceived);
|
this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.commandReceived);
|
||||||
this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.connected);
|
this.plejdBleHandler.removeAllListeners(PlejBLEHandler.EVENTS.connected);
|
||||||
|
|
@ -41,17 +43,21 @@ class PlejdDeviceCommunication extends EventEmitter {
|
||||||
|
|
||||||
async init() {
|
async init() {
|
||||||
try {
|
try {
|
||||||
|
this.cleanup();
|
||||||
|
this.bleConnected = false;
|
||||||
// eslint-disable-next-line max-len
|
// eslint-disable-next-line max-len
|
||||||
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.commandReceived, (deviceId, command, data) => this._bleCommandReceived(deviceId, command, data));
|
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.commandReceived, (deviceId, command, data) => this._bleCommandReceived(deviceId, command, data));
|
||||||
|
|
||||||
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.connected, () => {
|
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.connected, () => {
|
||||||
logger.info('Bluetooth connected. Plejd BLE up and running!');
|
logger.info('Bluetooth connected. Plejd BLE up and running!');
|
||||||
logger.verbose('Starting writeQueue loop.');
|
logger.verbose(`Starting writeQueue loop. Write queue length: ${this.writeQueue.length}`);
|
||||||
this.startWriteQueue();
|
this.bleConnected = true;
|
||||||
|
this._startWriteQueue();
|
||||||
});
|
});
|
||||||
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.reconnecting, () => {
|
this.plejdBleHandler.on(PlejBLEHandler.EVENTS.reconnecting, () => {
|
||||||
logger.info('Bluetooth reconnecting...');
|
logger.info('Bluetooth reconnecting...');
|
||||||
logger.verbose('Stopping writeQueue loop until connection is established.');
|
logger.verbose(`Stopping writeQueue loop until connection is established. Write queue length: ${this.writeQueue.length}`);
|
||||||
|
this.bleConnected = false;
|
||||||
clearTimeout(this.writeQueueRef);
|
clearTimeout(this.writeQueueRef);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
@ -101,7 +107,7 @@ class PlejdDeviceCommunication extends EventEmitter {
|
||||||
state: 0,
|
state: 0,
|
||||||
});
|
});
|
||||||
} else if (command === COMMANDS.TRIGGER_SCENE) {
|
} else if (command === COMMANDS.TRIGGER_SCENE) {
|
||||||
this.emit(PlejdDeviceCommunication.EVENTS.sceneTriggered, deviceId, data.sceneId);
|
this.emit(PlejdDeviceCommunication.EVENTS.sceneTriggered, data.sceneId);
|
||||||
} else {
|
} else {
|
||||||
logger.warn(`Unknown ble command ${command}`);
|
logger.warn(`Unknown ble command ${command}`);
|
||||||
}
|
}
|
||||||
|
|
@ -226,16 +232,20 @@ class PlejdDeviceCommunication extends EventEmitter {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
startWriteQueue() {
|
_startWriteQueue() {
|
||||||
logger.info('startWriteQueue()');
|
logger.info('startWriteQueue()');
|
||||||
clearTimeout(this.writeQueueRef);
|
clearTimeout(this.writeQueueRef);
|
||||||
|
|
||||||
this.writeQueueRef = setTimeout(() => this.runWriteQueue(), this.config.writeQueueWaitTime);
|
this.writeQueueRef = setTimeout(() => this._runWriteQueue(), this.config.writeQueueWaitTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
async runWriteQueue() {
|
async _runWriteQueue() {
|
||||||
try {
|
try {
|
||||||
while (this.writeQueue.length > 0) {
|
while (this.writeQueue.length > 0) {
|
||||||
|
if (!this.bleConnected) {
|
||||||
|
logger.warn('BLE not connected, stopping write queue until connection is up again.');
|
||||||
|
return;
|
||||||
|
}
|
||||||
const queueItem = this.writeQueue.pop();
|
const queueItem = this.writeQueue.pop();
|
||||||
const deviceName = this.deviceRegistry.getDeviceName(queueItem.deviceId);
|
const deviceName = this.deviceRegistry.getDeviceName(queueItem.deviceId);
|
||||||
logger.debug(
|
logger.debug(
|
||||||
|
|
@ -285,7 +295,7 @@ class PlejdDeviceCommunication extends EventEmitter {
|
||||||
logger.error('Error in writeQueue loop, values probably not written to Plejd', e);
|
logger.error('Error in writeQueue loop, values probably not written to Plejd', e);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.writeQueueRef = setTimeout(() => this.runWriteQueue(), this.config.writeQueueWaitTime);
|
this.writeQueueRef = setTimeout(() => this._runWriteQueue(), this.config.writeQueueWaitTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue