Наблюдаемая нумерация страниц
Во-первых: это первый проект, в котором я использую RxJ, я думал, что лучше всего буду его использовать.
Я нашел этот ответ: Превращение разбитых на страницы запросов в Observable поток с RxJs Но в комментариях говорится:
Вы по-прежнему превышаете максимальный стек вызовов. На отметке 430 страниц вернулись. Я думаю, что рекурсия не может быть лучшим решением здесь
Я хочу запросить API данных Youtube, результаты возвращаются на страницах, и мне нужно разбить их на страницы. Я представлял себе, что такой рабочий процесс может работать: 1) Инициировать вызов. 2) Проверить, есть ли в ответе значение "nextPageToken". 3) Если это так, сделать еще один запрос к API YouTube. 4) Если нет, закончить.
So to do this I could Imagine the following Observables / streams:
FirstRequestStream -A-X--------------->
ResponseStream -A-A-A-A--X-------->
RequestStream -I-A-I-A----------->
A = Action
I = Info from upper stream
X = Termination
(Не уверен, что эта диаграмма верна так, как я это сделал)
Таким образом, ResponseStream зависит от FirstRequestStream и RequestStream(используя функцию слияния). RequestStream зависит от ResponseStream(это называется циркулирующей наблюдаемой?)
- Это правильный подход?
- Являются ли "циркулирующие наблюдаемые" хорошей вещью, возможны ли они вообще?(У меня были проблемы с их созданием).
-В любом другом случае я должен попробовать первым?
- Можно ли создавать взаимозависимые наблюдаемые потоки?
Спасибо за помощь.
4 ответа
Вы слишком усложняете эту проблему, ее можно решить намного проще, используя оператор defer.
Идея состоит в том, что вы создаете отложенную наблюдаемую (поэтому она будет создана и начнет извлекать данные только после подписки) и объедините ее с той же наблюдаемой, но для следующей страницы, которая также будет объединена со следующей страницей и т. Д.... И все это можно сделать без рекурсии.
Вот как выглядит код:
const { defer, from, concat, EMPTY, timer } = rxjs; // = require("rxjs")
const { mergeMap, take, mapTo, tap } = rxjs.operators; // = require("rxjs/operators")
// simulate network request
function fetchPage(page=0) {
return timer(100).pipe(
tap(() => console.log(`-> fetched page ${page}`)),
mapTo({
items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
nextPage: page + 1,
})
);
}
const getItems = page => defer(() => fetchPage(page)).pipe(
mergeMap(({ items, nextPage }) => {
const items$ = from(items);
const next$ = nextPage ? getItems(nextPage) : EMPTY;
return concat(items$, next$);
})
);
// process only first 30 items, without fetching all of the data
getItems()
.pipe(take(30))
.subscribe(e => console.log(e));
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>
Вот мое решение с использованием операторов rxjs expand
, reduce
а также empty
с помощью модуля HttpClient:
Предположим, ваш ответ API - это объект, имеющий форму следующего вида
interface Response {
data: items[]; // array of result items
next: string|null; // url of next page, or null if there are no more items
}
Вы можете использовать расширение и уменьшение так
getAllResults(url) {
return this.http.get(url).pipe(
expand((res) => res.next ? this.http.get(res.next) : EMPTY),
reduce((acc, res) => acc.concat(res.data), [])
);
}
Я бесстыдно повторно использую фрагмент кода от Олеса Савлука, с его хорошим fetchPage
функции, и я применяю идеи, изложенные в статье блога, на которую ссылается Picci (в комментариях), используя expand
,
Статья о экспансии Николаса Джеймисона
Это дает немного более простой код с рекурсией, скрытой в expand
вызов (и комментарии к статье показывают, как линеаризовать его, если это необходимо).
const { from, concat, empty, timer } = rxjs; // = require("rxjs")
const { concatMap, expand, mapTo, tap, toArray } = rxjs.operators; // = require("rxjs/operators")
// simulate network request
const pageNumber = 3;
function fetchPage(page = 0) {
return timer(1000).pipe(
tap(() => console.log(`-> fetched page ${page}`)),
mapTo({
items: Array.from({ length: 10 }).map((_, i) => page * 10 + i),
nextPage: ++page === pageNumber ? undefined : page,
}),
);
}
const list = fetchPage().pipe(
expand(({ nextPage }) => nextPage ? fetchPage(nextPage) : empty()),
concatMap(({ items }) => items),
// Transforms the stream of numbers (Observable<number>)
// to a stream with only an array of numbers (Observable<number[]>).
// Remove if you want a stream of numbers, not waiting for all requests to complete.
toArray(),
);
list.subscribe((items) => console.log(items));
<script src="https://unpkg.com/rxjs@6.2.2/bundles/rxjs.umd.min.js"></script>
LuJaks - это, безусловно, самый простой подход!
В качестве более простого примера предположим, что у вас есть функция, которая выполняет HTTP-запрос для данной страницы и возвращает (частичный) массив данных. Мы вызываем эту функцию до тех пор, пока сервер не вернет пустой массив:
import { EMPTY, of } from "rxjs";
import { expand, mergeAll } from "rxjs/operators";
function httpGet(p): Observable<number[]> {
if (p > 5) {
return of([]);
}
return of(new Array(10).fill(0).map((_, i) => p * 10 + i));
}
let page = 0;
httpGet(page++).pipe( // get the fist page
expand((value, index) => (value.length > 0 ? httpGet(++page) : EMPTY)), // other pages
mergeAll(), // and consolidate into a single array as if there was only one call
).subscribe((x) => console.log(x));