Node-Serialport: невозможно обработать входящие данные с помощью RxJS Observable при применении тайм-аута
В настоящее время я работаю над проектом NodeJS, используя serialport
модуль в сочетании с RxJS Observables. Предполагаемый "поток" / вариант использования выглядит следующим образом:
- название последовательного порта
portName
отправляется через последовательный порт - поскольку RxD и TxD связаны друг с другом, данные "отражаются" на аппаратной стороне
- данные читаются через последовательный порт
- входящие данные обрабатываются
readline-parser
и передан в RxJS Observable - если читать
data
равно ранее отправленномуportName
, наблюдаемые больше не нужны и будут завершеныobserver.complete()
Мне удалось реализовать вышеупомянутый поток, но нужно сделать некоторые дополнительные реализации, такие как
- тайм-аут, если данные не получены в течение определенного периода времени
- повторяет попытку отправить команду снова в случае тайм-аута или других ошибок
Я работаю над реализацией тайм-аута и попробовал оба NodeJS'setTimeout()
и собственный RxJS timeout
функция При применении любой функции тайм-аута данные, по-видимому, не считываются / извлекаются последовательным портом, что, в свою очередь, вызывает ошибку тайм-аута.
Предполагая, что данных нет, на первый взгляд это выглядит вполне нормально, поскольку время ожидания делает то, что должно. Однако мне удалось дважды проверить, что нужные данные отправляются в порт, используя не только программный эмулируемый последовательный порт, но и два преобразователя USB-to-Serial CP2102 (см. Комментарии в коде для получения дополнительной информации).):
'use strict';
const Rx = require('rxjs');
const { interval } = require('rxjs');
const { timeout } = require('rxjs/operators');
const SerialPort = require('serialport');
const Readline = require('@serialport/parser-readline');
// `tries` is needed for later implementation of communcation retries
const myObservable = (portName, tries) => {
const port = new SerialPort(portName);
const parser = port.pipe( new Readline() );
port.write(`${portName}\n`);
return Rx.Observable
.create( (observer) => {
parser.on('data', (data) => {
observer.next(data);
console.log(`Detection will be: ${data == portName} (${data} vs. ${portName})`);
if (data == portName)
{
port.close( (err) => {
if (err)
{
console.log(`Error on closing serial port: ${err}`);
observer.error(err);
}
});
observer.complete();
}
})
})
// `timeout` is needed for later implementation of communication timeout, see comment at end of code
// .pipe(timeout(10000))
}
const myObserver = {
next: x => console.log(`got data from observable: ${x}`),
error: err => console.error(`something wrong occurred: ${err}`),
complete: () => console.log('done'),
};
console.log('before subscribe');
const sub = myObservable('/dev/tty.usbserial-FTG7L3FX', null).subscribe(myObserver);
// double-checked that data is sent by using software (created an emulated pair of virtual serial ports with `socat -d -d pty,raw,echo=0 pty,raw,echo=0`)
// --> data is sent, but not read/retrieved when either using `setTimeout` or RxJS' own `timeout()`
// const sub = myObservable('/dev/ttys003', null).subscribe(myObserver);
// double-checked that data is sent by using hardware interfaces (used two CP2102 modules with pairwise-crossed RxD and TxD)
// --> data is sent, but not read/retrieved when either using `setTimeout` or RxJS' own `timeout()`
// const sub = myObservable('/dev/tty.SLAB_USBtoUART', null).subscribe(myObserver);
console.log('after subscribe');
// when commenting the following `setTimeout()` data is retrieved, but does not work with `setTimeout()`
// so tried to use RxJS' `timeout()` operator --> not working either
// setTimeout(() => {
// sub.unsubscribe();
// console.log('unsubscribed');
// }, 10000);
Что мне здесь не хватает? Почему данные отправляются, когда тайм-аут не используется, а не при использовании функции тайм-аута?
Обновления в связи с дальнейшим расследованием:
- когда
timeout()
применяется, данные отправляются после истечения времени ожидания, что означает, что время ожидания запускает как отправку данных, так и выход из наблюдаемой, поскольку истекло время ожидания. Итак.timeout()
Похоже, не применяется к возвращеннымRx.Observable
, но ко всей функцииmyObservable
,