rxjs - `share` не работает должным образом

У меня есть следующий вспомогательный оператор rxjs:

      import { share } from 'rxjs/operators';

export const shareResetOnError = <T>() => share<T>({
  resetOnError: true,
  resetOnComplete: false
});

У меня также есть следующая спецификация для этого оператора:

      import { Observable } from 'rxjs';
import { shareResetOnError } from './rxjs';

fdescribe('shareResetOnError', () => {
  it('should share last emitted value', async () => {
    const expectedValue = 123;

    let count = 0;
    const observable = new Observable(subscriber => {
      count++;
      subscriber.next(-expectedValue);
      subscriber.next(expectedValue);
      subscriber.complete();
    }).pipe(shareResetOnError());

    for (let i = 0; i < 3; i++) {
      await expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue);
    }
    expect(count).toBe(1);
  });

  it('should reset value on error', async () => {
    const expectedError = new Error('test');
    const expectedValue = 123;

    let expectError = true;

    let errorsCount = 0;
    let valuesCount = 0;

    const observable = new Observable(subscriber => {
      if (expectError) {
        errorsCount++;
        subscriber.error(expectedError);
      } else {
        valuesCount++;
        subscriber.next(expectedValue);
      }
      subscriber.complete();
    }).pipe(shareResetOnError());

    for (let i = 0; i < 4; i++) {
      await expectAsync(observable.toPromise()).toBeRejectedWithError(expectedError.message);
    }
    expect(errorsCount).toBe(4);

    expectError = false;
    for (let i = 0; i < 3; i++) {
      await expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue);
    }
    expect(valuesCount).toBe(1);
  });
});

По какой-то причине expectAsync(observable.toPromise()).toBeResolvedTo(expectedValue)терпит неудачу, потому что observableразрешается в undefined вместо expectedValue. я тоже пробовал lastValueFromвместо toPromiseно это не имеет значения. Перед переходом с rxjs 6 на 7 у меня было следующее определение для shareResetOnError:

      import { AsyncSubject, ConnectableObservable, Observable, pipe, Subscription } from 'rxjs';
import { refCount } from 'rxjs/operators';

function publishLastResetOnError<T>() {
  return (source: Observable<T>) => {
    let subject: AsyncSubject<T>;
    let subscription: Subscription;
    resetSubject();
    return new ConnectableObservable(source, () => subject);

    function resetSubject() {
      subscription?.unsubscribe();
      subject = new AsyncSubject<T>();
      subscription = subject.subscribe({
        error: resetSubject
      });
    }
  };
}

export const shareResetOnError = <T>() => pipe(publishLastResetOnError<T>(), refCount());

Он работал, как и ожидалось, и спецификация не подвела. Почему observable.toPromise()не разрешено ожидаемое значение с оператором rxjs 7?

0 ответов

Другие вопросы по тегам