Как связать вызовы API с помощью rxjs?

Я понимаю, что есть много вопросов по теме заголовка, но я не смог найти четкий и краткий вопрос/ответ, который помог бы мне в дальнейшем.

Предположим, у нас есть x вызовов веб-API, которые возвращают наблюдаемые объекты Observable<cs[]>... См.

        init() {
    this.service.serviceA().subscribe({
      next: (as) => {
        this.as = as;
        this.service.serviceB().subscribe({
          next: (bs) => {
            this.bs = bs;
            this.service.serviceC().subscribe({
              next: (cs) => {
                this.cs = cs;
                this.restructureABCs();
              },
              error: (e) => this.notification.showError('Failed to load cs! Error: ' + e.messageerror)
            });
          },
          error: (e) => this.notification.showError('Failed to load bs! Error: ' + e.messageerror)
        });
      },
      error: (e) => this.notification.showError('Failed to load as! Error: ' + e.messageerror)
    })
  }

Функция restructureABCs() зависит от as/bs и cs. так как bs и cs не зависят друг от друга. Текущая реализация уродлива и может быть улучшена, но я не уверен, какой оператор мне следует использовать. Мне не имеет смысла использовать concatMap, поскольку (согласно моему, возможно ошибочному пониманию) несколько потоков будут объединены в один, чего я не хочу. Все, что мне нужно, это убедиться, что as/bs и cs загружены до вызова функции restructureABCs() .

угловой13, rxjs7.4

2 ответа

Если BS, CS, AS не зависят друг от друга, они могут работать параллельно, все, что вам нужно, это убедиться, что все эти Observables завершены для запуска restructureABCs(). Таким образом, вы можете использовать combLatest/forkJoin для этого случая.

      combineLatest([
   this.service.serviceA().pipe(catchError(er => ...)),
   this.service.serviceB().pipe(catchError(er => ...)),
   this.service.serviceC().pipe(catchError(er => ...))
]).subscribe(response => {
      this.as = response[0];
      this.bs = response[1];
      this.cs = response[2];

      restructureABCs()
})

Вот решение, которое я придумал, используя forkJoin.

      as : Array<as>;
bs : Array<bs>;
cs : Array<cs>;
as$: Observable<as[]> = this.service.serviceA();
bs$: Observable<bs[]> = this.service.serviceB();
cs$: Observable<cs[]> = this.service.serviceC();

init() {


forkJoin([
      this.as$.pipe(catchError(er => of({errMsg: 'Failed to load as! Error: ', error: er}))),
      this.bs$.pipe(catchError(er => of({errMsg: 'Failed to load bs! Error: ', error: er}))),
      this.cs$.pipe(catchError(er => of({errMsg: 'Failed to load cs! Error: ', error: er})))
    ])
      .subscribe({
        next: ([a, b, c]) => {
          this.as = a as Array<as>;
          this.bs = b as Array<bs>;
          this.cs = c as Array<cs>;
          this.restructureABCs();
        },
        error: (e) => this.notification.showError(e.errMsg)
      });
}

Нашел полезную статью об обработке ошибок в forkjoin:

Другие вопросы по тегам