Наблюдаемая нумерация страниц

Во-первых: это первый проект, в котором я использую RxJ, я думал, что лучше всего буду его использовать.

Я нашел этот ответ: Превращение разбитых на страницы запросов в Observable поток с RxJs Но в комментариях говорится:

Вы по-прежнему превышаете максимальный стек вызовов. На отметке 430 страниц вернулись. Я думаю, что рекурсия не может быть лучшим решением здесь

Я хочу запросить API данных Youtube, результаты возвращаются на страницах, и мне нужно разбить их на страницы. Я представлял себе, что такой рабочий процесс может работать: 1) Инициировать вызов. 2) Проверить, есть ли в ответе значение "nextPageToken". 3) Если это так, сделать еще один запрос к API YouTube. 4) Если нет, закончить.

So to do this I could Imagine the following Observables / streams:
FirstRequestStream -A-X--------------->
ResponseStream     -A-A-A-A--X-------->
RequestStream      -I-A-I-A----------->
A = Action
I = Info from upper stream
X = Termination

(Не уверен, что эта диаграмма верна так, как я это сделал)

Таким образом, ResponseStream зависит от FirstRequestStream и RequestStream(используя функцию слияния). RequestStream зависит от ResponseStream(это называется циркулирующей наблюдаемой?)

- Это правильный подход?

- Являются ли "циркулирующие наблюдаемые" хорошей вещью, возможны ли они вообще?(У меня были проблемы с их созданием).

-В любом другом случае я должен попробовать первым?

- Можно ли создавать взаимозависимые наблюдаемые потоки?

Спасибо за помощь.

4 ответа

Решение

Вы слишком усложняете эту проблему, ее можно решить намного проще, используя оператор defer.

Идея состоит в том, что вы создаете отложенную наблюдаемую (поэтому она будет создана и начнет извлекать данные только после подписки) и объедините ее с той же наблюдаемой, но для следующей страницы, которая также будет объединена со следующей страницей и т. Д.... И все это можно сделать без рекурсии.

Вот как выглядит код:

const { defer, from, concat, EMPTY, timer } = rxjs; // = require("rxjs")
const { mergeMap, take, mapTo, tap } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
function fetchPage(page=0) {
  return timer(100).pipe(
    tap(() => console.log(`-> fetched page ${page}`)),
    mapTo({
      items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
      nextPage: page + 1,
    })
  );
}

const getItems = page => defer(() => fetchPage(page)).pipe(
  mergeMap(({ items, nextPage }) => {
    const items$ = from(items);
    const next$ = nextPage ? getItems(nextPage) : EMPTY;
    return concat(items$, next$);
  })
);

// process only first 30 items, without fetching all of the data
getItems()
 .pipe(take(30))
 .subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>

Вот мое решение с использованием операторов rxjs expand, reduce а также empty с помощью модуля HttpClient:

Предположим, ваш ответ API - это объект, имеющий форму следующего вида

interface Response {
  data: items[]; // array of result items
  next: string|null; // url of next page, or null if there are no more items
}

Вы можете использовать расширение и уменьшение так

getAllResults(url) {
  return this.http.get(url).pipe(
    expand((res) => res.next ? this.http.get(res.next) : EMPTY),
    reduce((acc, res) => acc.concat(res.data), [])
  );
}

Я бесстыдно повторно использую фрагмент кода от Олеса Савлука, с его хорошим fetchPage функции, и я применяю идеи, изложенные в статье блога, на которую ссылается Picci (в комментариях), используя expand,

Статья о экспансии Николаса Джеймисона

Это дает немного более простой код с рекурсией, скрытой в expand вызов (и комментарии к статье показывают, как линеаризовать его, если это необходимо).

const { from, concat, empty, timer } = rxjs; // = require("rxjs")
const { concatMap, expand, mapTo, tap, toArray } = rxjs.operators; // = require("rxjs/operators")

// simulate network request
const pageNumber = 3;
function fetchPage(page = 0) {
return timer(1000).pipe(
    tap(() => console.log(`-> fetched page ${page}`)),
    mapTo({
        items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
        nextPage: ++page === pageNumber ? undefined : page,
    }),
);
}

const list = fetchPage().pipe(
expand(({ nextPage }) => nextPage ? fetchPage(nextPage) : empty()),
concatMap(({ items }) => items),
// Transforms the stream of numbers (Observable<number>)
// to a stream with only an array of numbers (Observable<number[]>).
// Remove if you want a stream of numbers, not waiting for all requests to complete.
toArray(),
);

list.subscribe((items) => console.log(items));
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>

LuJaks - это, безусловно, самый простой подход!

В качестве более простого примера предположим, что у вас есть функция, которая выполняет HTTP-запрос для данной страницы и возвращает (частичный) массив данных. Мы вызываем эту функцию до тех пор, пока сервер не вернет пустой массив:

      import { EMPTY, of } from "rxjs";
import { expand, mergeAll } from "rxjs/operators";

function httpGet(p): Observable<number[]> {
  if (p > 5) {
    return of([]);
  }
  return of(new Array(10).fill(0).map((_, i) => p * 10 + i));
}

let page = 0;

httpGet(page++).pipe( // get the fist page
    expand((value, index) => (value.length > 0 ? httpGet(++page) : EMPTY)), // other pages
    mergeAll(),       // and consolidate into a single array as if there was only one call
  ).subscribe((x) => console.log(x));
Другие вопросы по тегам