Node-Serialport: невозможно обработать входящие данные с помощью RxJS Observable при применении тайм-аута

В настоящее время я работаю над проектом NodeJS, используя serialport модуль в сочетании с RxJS Observables. Предполагаемый "поток" / вариант использования выглядит следующим образом:

  1. название последовательного порта portName отправляется через последовательный порт
  2. поскольку RxD и TxD связаны друг с другом, данные "отражаются" на аппаратной стороне
  3. данные читаются через последовательный порт
  4. входящие данные обрабатываются readline-parserи передан в RxJS Observable
  5. если читатьdataравно ранее отправленномуportName, наблюдаемые больше не нужны и будут завершены observer.complete()

Мне удалось реализовать вышеупомянутый поток, но нужно сделать некоторые дополнительные реализации, такие как

  1. тайм-аут, если данные не получены в течение определенного периода времени
  2. повторяет попытку отправить команду снова в случае тайм-аута или других ошибок

Я работаю над реализацией тайм-аута и попробовал оба NodeJS'setTimeout()и собственный RxJS timeout функция При применении любой функции тайм-аута данные, по-видимому, не считываются / извлекаются последовательным портом, что, в свою очередь, вызывает ошибку тайм-аута.

Предполагая, что данных нет, на первый взгляд это выглядит вполне нормально, поскольку время ожидания делает то, что должно. Однако мне удалось дважды проверить, что нужные данные отправляются в порт, используя не только программный эмулируемый последовательный порт, но и два преобразователя USB-to-Serial CP2102 (см. Комментарии в коде для получения дополнительной информации).):

'use strict';
const Rx = require('rxjs');
const { interval } = require('rxjs');
const { timeout } = require('rxjs/operators');
const SerialPort = require('serialport');
const Readline = require('@serialport/parser-readline');


// `tries` is needed for later implementation of communcation retries
const myObservable = (portName, tries) => {
  const port = new SerialPort(portName);
  const parser = port.pipe( new Readline() );
  port.write(`${portName}\n`);

  return Rx.Observable
    .create( (observer) => {
      parser.on('data', (data) => {
        observer.next(data);
        console.log(`Detection will be: ${data == portName} (${data} vs. ${portName})`);
        if (data == portName)
        {
          port.close( (err) => {
            if (err)
            {
              console.log(`Error on closing serial port: ${err}`);
              observer.error(err);
            }
          });
          observer.complete();
        }
      })
    })
    // `timeout` is needed for later implementation of communication timeout, see comment at end of code
    // .pipe(timeout(10000))
}


const myObserver = {
  next:     x => console.log(`got data from observable: ${x}`),
  error:    err => console.error(`something wrong occurred: ${err}`),
  complete: () => console.log('done'),
};


console.log('before subscribe');
const sub = myObservable('/dev/tty.usbserial-FTG7L3FX', null).subscribe(myObserver);
// double-checked that data is sent by using software (created an emulated pair of virtual serial ports with `socat -d -d pty,raw,echo=0 pty,raw,echo=0`)
// --> data is sent, but not read/retrieved when either using `setTimeout` or RxJS' own `timeout()`
// const sub = myObservable('/dev/ttys003', null).subscribe(myObserver);
// double-checked that data is sent by using hardware interfaces (used two CP2102 modules with pairwise-crossed RxD and TxD)
// --> data is sent, but not read/retrieved when either using `setTimeout` or RxJS' own `timeout()`
// const sub = myObservable('/dev/tty.SLAB_USBtoUART', null).subscribe(myObserver);
console.log('after subscribe');


// when commenting the following `setTimeout()` data is retrieved, but does not work with `setTimeout()`
// so tried to use RxJS' `timeout()` operator --> not working either
// setTimeout(() => {
//   sub.unsubscribe();
//   console.log('unsubscribed');
// }, 10000);

Что мне здесь не хватает? Почему данные отправляются, когда тайм-аут не используется, а не при использовании функции тайм-аута?


Обновления в связи с дальнейшим расследованием:

  1. когда timeout() применяется, данные отправляются после истечения времени ожидания, что означает, что время ожидания запускает как отправку данных, так и выход из наблюдаемой, поскольку истекло время ожидания. Итак .timeout() Похоже, не применяется к возвращенным Rx.Observable, но ко всей функции myObservable,

0 ответов

Другие вопросы по тегам