Как использовать fromEvent вместо bindCallback в методе объекта?

Мне нужно подключить метод обратного вызова onData rn-fetch-blob к наблюдаемому объекту.

Насколько я знаю, это не событие, fromEventPattern использовать нельзя. Я не понимаю, как использовать create, если это решение моей проблемы.

Я нашел bindCallback, который выглядел многообещающим, но в документации говорится, что мне, вероятно, следует использовать fromEvent вместо этого:

Обратите внимание, что Observable, созданный функцией вывода, всегда будет выдавать одно значение, а затем немедленно завершиться. Если func вызывает обратный вызов несколько раз, значения из последующих вызовов не будут отображаться в потоке. Если вам нужно прослушивать несколько вызовов, вы, вероятно, захотите использовать fromEvent или fromEventPattern.

Мне действительно нужно прослушать несколько звонков.

В любом случае я пытаюсь использовать bindCallback для метода объекта, как показано в документе. В моем файле Typescript:

import { bindCallback } from 'rxjs';

В моем классе:

private emitter!: Observable<any>;

в приватном методе:

RNFetchBlob.fs
    .readStream(
      filePath,
      "utf8",
      -1,
      10
    )
    .then(ifstream => {
      ifstream.open();
      this.emitter = bindCallback(ifstream.onData);

но он не компилируется:

error TS2322: Type '() => Observable<string | number[]>' is not assignable to type 'Observable<any>'.
  Property '_isScalar' is missing in type '() => Observable<string | number[]>'.

Я действительно не понимаю, как использовать fromEvent в моем случае.

Любая помощь приветствуется.

РЕДАКТИРОВАТЬ: Добавлен рабочий код для тех, кто ищет ответ:

RNFetchBlob.fs
            .readStream(
              // file path
              peripheral.name,
              // encoding, should be one of `base64`, `utf8`, `ascii`
              "utf8",
              // (optional) buffer size, default to 4096 (4095 for BASE64 encoded data)
              // when reading file in BASE64 encoding, buffer size must be multiples of 3.
              -1,
              10
            )
            .then(ifstream => {
              ifstream.open();
              this.emitter = new Observable(subscriber => {
                ifstream.onData(chunk => {
                  // chunk will be a string when encoding is 'utf8'
                  logging.logWithTimestamp(`Received [${chunk}]`);
                  subscriber.next(chunk);
                });
                ifstream.onError(err => {
                  logging.logWithTimestamp(`oops [${err}]`);
                  subscriber.error(err);
                });

                ifstream.onEnd(() => {
                  subscriber.complete();
                });
              });
              this.rxSubscription = this.emitter
                .pipe(
                  concatMap(value =>
                    this.handleUpdatedValuesComingFromCSVFile(value)
                  )
                )
                .subscribe();

2 ответа

Решение

Не слишком знаком с rn-fetch-blob но, надеюсь, вы уловили идею, вы также возвращаете функцию для запуска логики очистки.

 const onDataObserevable=new Obserevable(obs=>{
    ifstream.onData(data=>obs.next(data))
    return ()=>{... you can add some clean up logic here like unsubcribe from source onData event}
 });

ОБНОВИТЬ

преобразовал всю цепочку в наблюдаемую, надеюсь, вы уловили идею

from(RNFetchBlob.fs.readStream(
              peripheral.name,
              "utf8",
              -1,
              10
            ))
            .pipe(mergeMap(ifstream => {
              ifstream.open();
              return new Observable(subscriber => {
                ifstream.onData(chunk => {
                  // chunk will be a string when encoding is 'utf8'
                  logging.logWithTimestamp(`Received [${chunk}]`);
                  subscriber.next(chunk);
                });

                ifstream.onError(err => {
                  logging.logWithTimestamp(`oops [${err}]`);
                  subscriber.error(err);
                });

                ifstream.onEnd(() => {
                  subscriber.complete();
                });

              });),
              mergeMap(value =>this.handleUpdatedValuesComingFromCSVFile(value)
                  )
              )

Вот как это сделать с fromEventPattern:

this.emitter = fromEventPattern(
  // tell fromEventPattern how to subscribe to the data
  handler => ifstream.onData(handler),
  // tell fromEventPattern how to unsubscribe from the data
  handler => ifstream.onData(null)
)
Другие вопросы по тегам