Как использовать fromEvent вместо bindCallback в методе объекта?
Мне нужно подключить метод обратного вызова onData rn-fetch-blob к наблюдаемому объекту.
Насколько я знаю, это не событие, fromEventPattern использовать нельзя. Я не понимаю, как использовать create, если это решение моей проблемы.
Я нашел bindCallback, который выглядел многообещающим, но в документации говорится, что мне, вероятно, следует использовать fromEvent вместо этого:
Обратите внимание, что Observable, созданный функцией вывода, всегда будет выдавать одно значение, а затем немедленно завершиться. Если func вызывает обратный вызов несколько раз, значения из последующих вызовов не будут отображаться в потоке. Если вам нужно прослушивать несколько вызовов, вы, вероятно, захотите использовать fromEvent или fromEventPattern.
Мне действительно нужно прослушать несколько звонков.
В любом случае я пытаюсь использовать bindCallback для метода объекта, как показано в документе. В моем файле Typescript:
import { bindCallback } from 'rxjs';
В моем классе:
private emitter!: Observable<any>;
в приватном методе:
RNFetchBlob.fs
.readStream(
filePath,
"utf8",
-1,
10
)
.then(ifstream => {
ifstream.open();
this.emitter = bindCallback(ifstream.onData);
но он не компилируется:
error TS2322: Type '() => Observable<string | number[]>' is not assignable to type 'Observable<any>'.
Property '_isScalar' is missing in type '() => Observable<string | number[]>'.
Я действительно не понимаю, как использовать fromEvent в моем случае.
Любая помощь приветствуется.
РЕДАКТИРОВАТЬ: Добавлен рабочий код для тех, кто ищет ответ:
RNFetchBlob.fs
.readStream(
// file path
peripheral.name,
// encoding, should be one of `base64`, `utf8`, `ascii`
"utf8",
// (optional) buffer size, default to 4096 (4095 for BASE64 encoded data)
// when reading file in BASE64 encoding, buffer size must be multiples of 3.
-1,
10
)
.then(ifstream => {
ifstream.open();
this.emitter = new Observable(subscriber => {
ifstream.onData(chunk => {
// chunk will be a string when encoding is 'utf8'
logging.logWithTimestamp(`Received [${chunk}]`);
subscriber.next(chunk);
});
ifstream.onError(err => {
logging.logWithTimestamp(`oops [${err}]`);
subscriber.error(err);
});
ifstream.onEnd(() => {
subscriber.complete();
});
});
this.rxSubscription = this.emitter
.pipe(
concatMap(value =>
this.handleUpdatedValuesComingFromCSVFile(value)
)
)
.subscribe();
2 ответа
Не слишком знаком с rn-fetch-blob
но, надеюсь, вы уловили идею, вы также возвращаете функцию для запуска логики очистки.
const onDataObserevable=new Obserevable(obs=>{
ifstream.onData(data=>obs.next(data))
return ()=>{... you can add some clean up logic here like unsubcribe from source onData event}
});
ОБНОВИТЬ
преобразовал всю цепочку в наблюдаемую, надеюсь, вы уловили идею
from(RNFetchBlob.fs.readStream(
peripheral.name,
"utf8",
-1,
10
))
.pipe(mergeMap(ifstream => {
ifstream.open();
return new Observable(subscriber => {
ifstream.onData(chunk => {
// chunk will be a string when encoding is 'utf8'
logging.logWithTimestamp(`Received [${chunk}]`);
subscriber.next(chunk);
});
ifstream.onError(err => {
logging.logWithTimestamp(`oops [${err}]`);
subscriber.error(err);
});
ifstream.onEnd(() => {
subscriber.complete();
});
});),
mergeMap(value =>this.handleUpdatedValuesComingFromCSVFile(value)
)
)
Вот как это сделать с fromEventPattern
:
this.emitter = fromEventPattern(
// tell fromEventPattern how to subscribe to the data
handler => ifstream.onData(handler),
// tell fromEventPattern how to unsubscribe from the data
handler => ifstream.onData(null)
)