rxjs - `share` не работает должным образом
У меня есть следующий вспомогательный оператор rxjs:
import { share } from 'rxjs/operators';
export const shareResetOnError = <T>() => share<T>({
resetOnError: true,
resetOnComplete: false
});
У меня также есть следующая спецификация для этого оператора:
import { Observable } from 'rxjs';
import { shareResetOnError } from './rxjs';
fdescribe('shareResetOnError', () => {
it('should share last emitted value', async () => {
const expectedValue = 123;
let count = 0;
const observable = new Observable(subscriber => {
count++;
subscriber.next(-expectedValue);
subscriber.next(expectedValue);
subscriber.complete();
}).pipe(shareResetOnError());
for (let i = 0; i < 3; i++) {
await expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue);
}
expect(count).toBe(1);
});
it('should reset value on error', async () => {
const expectedError = new Error('test');
const expectedValue = 123;
let expectError = true;
let errorsCount = 0;
let valuesCount = 0;
const observable = new Observable(subscriber => {
if (expectError) {
errorsCount++;
subscriber.error(expectedError);
} else {
valuesCount++;
subscriber.next(expectedValue);
}
subscriber.complete();
}).pipe(shareResetOnError());
for (let i = 0; i < 4; i++) {
await expectAsync(observable.toPromise()).toBeRejectedWithError(expectedError.message);
}
expect(errorsCount).toBe(4);
expectError = false;
for (let i = 0; i < 3; i++) {
await expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue);
}
expect(valuesCount).toBe(1);
});
});
По какой-то причине
expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue)
терпит неудачу, потому что
observable
разрешается в undefined вместо
expectedValue
. я тоже пробовал
lastValueFrom
вместо
toPromise
но это не имеет значения. Перед переходом с rxjs 6 на 7 у меня было следующее определение для
shareResetOnError
:
import { AsyncSubject, ConnectableObservable, Observable, pipe, Subscription } from 'rxjs';
import { refCount } from 'rxjs/operators';
function publishLastResetOnError<T>() {
return (source: Observable<T>) => {
let subject: AsyncSubject<T>;
let subscription: Subscription;
resetSubject();
return new ConnectableObservable(source, () => subject);
function resetSubject() {
subscription?.unsubscribe();
subject = new AsyncSubject<T>();
subscription = subject.subscribe({
error: resetSubject
});
}
};
}
export const shareResetOnError = <T>() => pipe(publishLastResetOnError<T>(), refCount());
Он работал, как и ожидалось, и спецификация не подвела. Почему
observable.toPromise()
не разрешено ожидаемое значение с оператором rxjs 7?