RxJS: MergeMap с сохранением порядка ввода

Требование:

urls = [url1, url2, url3]

Запустите все 3 URL параллельно и раскрасьте Дом в последовательности списка URL.

 ex: Finished order of urls = [url3, url1, url2]
     when url1 finishes Immediately render the DOM, without waiting for url2
     If url2, url3 finishes before url1, then store url2, url3 and paint the DOM after url1 arrives
     Paint the DOM with order [url1, url2, url3]

Моя работа с использованием обещаний:

// Fired all 3 urls at the same time
p1 = fetch(url1)
p2 = fetch(url2)
p3 = fetch(url3)

p1.then(updateDom)
  .then(() => p2)
  .then(updateDom)
  .then(() => p3)
  .then(updateDom)

Я хотел сделать то же самое в Observables.

from(urls)
  .pipe(
      mergeMap(x => fetch(x))
  )

Чтобы запустить их параллельно, я использовал карту слияния, но как я могу упорядочить последовательность результатов?

2 ответа

Решение

Я не смог найти ничего, что сохранило бы порядок, поэтому я придумал что-то немного запутанное.

const { concat, of, BehaviorSubject, Subject } = rxjs;
const { delay, filter } = rxjs.operators;

const parallelExecute = (...obs$) => {
  const subjects = obs$.map(o$ => {
    const subject$ = new BehaviorSubject();
    const sub = o$.subscribe(o => { subject$.next(o); });
    return { sub: sub, obs$: subject$.pipe(filter(val => val)) };
  });
  const subject$ = new Subject();
  sub(0);
  function sub(index) {
    const current = subjects[index];
    current.obs$.subscribe(c => {
      subject$.next(c);
      current.obs$.complete();
      current.sub.unsubscribe();
      if (index < subjects.length -1) {
        sub(index + 1);
      } else {
        subject$.complete();
      }
    });
  }
  return subject$;
}


parallelExecute(
  of(1).pipe(delay(3000)),
  of(2).pipe(delay(2000)),
  of(3).pipe(delay(1000))
).subscribe(result => { console.log(result); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>

Лучший способ сохранить порядок в таких асинхронных задачах - использовать concatMap.

Проблема в том, что если мы применим только это, мы потеряем распараллеливание. Если бы мы сделали что-то вроде этого:

from(urls)
  .pipe(
      concatMap(x => fetch(x))
  )

второй запрос не запускается, пока не будет выполнен первый.

Мы можем обойти это, разделив карту на отдельный оператор:

from(urls)
  .pipe(
      map(x => fetch(x)),
      concatMap(x => x)
  )

Все запросы будут запущены одновременно, а результаты будут отправлены в порядке запроса.

См . Пример Адриана, адаптированный для использования этого подхода, ниже:

const { from } = rxjs;
const { concatMap, map } = rxjs.operators;

function delayPromise(val, delay) {
  return new Promise(res => setTimeout(() => res(val), delay));
}

var delay = 3;


from([1, 2, 3]).pipe(
  map(x => delayPromise(x, delay-- * 1000)),
  concatMap(x => x)
).subscribe(result => { console.log(result); });
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>

Вы можете сформировать последовательность с помощью fetch и paint, а затем forkJoin/Promise.all их

p1 = fetch(url1)
p2 = fetch(url2)
p3 = fetch(url3)

forkJoin(
from(p1).pipe(tap(_=>paint dom...))
from(p1).pipe(tap(_=>paint dom...))
from(p1).pipe(tap(_=>paint dom...))
).subscribe()
Другие вопросы по тегам