Превращение разбитых на страницы запросов в наблюдаемый поток с RxJs
У меня есть сервис, который возвращает данные в страницах. Ответ на одну страницу содержит подробную информацию о том, как запросить следующую страницу.
Мой подход состоит в том, чтобы вернуть данные ответа, а затем немедленно объединить отложенный вызов в ту же наблюдаемую последовательность, если доступно больше страниц.
function getPageFromServer(index) {
// return dummy data for testcase
return {nextpage:index+1, data:[1,2,3]};
}
function getPagedItems(index) {
return Observable.return(getPageFromServer(index))
.flatMap(function(response) {
if (response.nextpage !== null) {
return Observable.fromArray(response.data)
.concat(Observable.defer(function() {return getPagedItems(response.nextpage);}));
}
return Observable.fromArray(response.data);
});
}
getPagedItems(0).subscribe(
function(item) {
console.log(new Date(), item);
},
function(error) {
console.log(error);
}
)
Это должен быть неправильный подход, потому что в течение 2 секунд вы получите:
throw e;
^
RangeError: Maximum call stack size exceeded
at CompositeDisposablePrototype.dispose (/Users/me/node_modules/rx/dist/rx.all.js:654:51)
Как правильно подходить к нумерации страниц?
3 ответа
РЕДАКТИРОВАТЬ Ах! Я вижу проблему, с которой вы сталкиваетесь. Небольшая оптимизация вызовов должна исправить вас:
function mockGetPageAjaxCall(index) {
// return dummy data for testcase
return Promise.resolve({nextpage:index+1, data:[1,2,3]});
}
function getPageFromServer(index) {
return Observable.create(function(obs) {
mockGetPageAjaxCall(index).then(function(page) {
obs.onNext(page);
}).catch(function(err) {
obs.onError(err)
}).finally(function() {
obs.onCompleted();
});
});
}
function getPagedItems(index) {
return Observable.create(function(obs) {
// create a delegate to do the work
var disposable = new SerialDisposable();
var recur = function(index) {
disposable.setDisposable(getPageFromServer(index).retry().subscribe(function(page) {
obs.onNext(page.items);
if(page.nextpage === null) {
obs.onCompleted();
}
// call the delegate recursively
recur(page.nextpage);
}));
};
// call the delegate to start it
recur(0);
return disposable;
});
}
getPagedItems(0).subscribe(
function(item) {
console.log(new Date(), item);
},
function(error) {
console.log(error);
}
)
Глядя на код OP, это действительно правильный метод. Просто нужно сделать ваш фиктивный сервис асинхронным, чтобы имитировать реальные условия. Вот решение, которое позволяет избежать исчерпания стека (я также сделал getPageFromServer
на самом деле вернуть наблюдаемую простуду вместо того, чтобы требовать, чтобы вызывающий ее обернул).
Обратите внимание, что если вы действительно ожидаете, что ваши запросы на обслуживание будут выполняться синхронно в реальном приложении, и, следовательно, вам нужно убедиться, что ваш код не исчерпывает стек, когда это происходит, просто getPagedItems()
вызвать планировщик currentThread. currentThread
планировщик планирует задачи, используя батут для предотвращения рекурсивных вызовов (и исчерпания стека). Смотрите закомментированную строку в конце getPagedItems
function getPageFromServer(index) {
// return dummy data asynchronously for testcase
// use timeout scheduler to force the result to be asynchronous like
// it would be for a real service request
return Rx.Observable.return({nextpage: index + 1, data: [1,2,3]}, Rx.Scheduler.timeout);
// for real request, if you are using jQuery, just use rxjs-jquery and return:
//return Rx.Observable.defer(function () { return $.ajaxAsObservable(...); });
}
function getPagedItems(index) {
var result = getPageFromServer(index)
.retry(3) // retry the call if it fails
.flatMap(function (response) {
var result = Rx.Observable.fromArray(response.data);
if (response.nextpage !== null) {
result = result.concat(getPagedItems(response.nextpage));
}
return result;
});
// If you think you will really satisfy requests synchronously, then instead
// of using the Rx.Scheduler.timeout in getPageFromServer(), you can
// use the currentThreadScheduler here to prevent the stack exhaustion...
// return result.observeOn(Rx.Scheduler.currentThread)
return result;
}
Вот более краткий и ИМХО чистый ответ без какой-либо рекурсии. Он использует ogen(~ 46 loc) для преобразования любого генератора в наблюдаемый.
Он имеет собственную встроенную функцию next, которая будет генерировать данные каждый раз, когда ваша функция что-то даст.
нб: оригинальная статья стоит прочитать
function getPagedItems({offset=0, limit=4}) {
paginatedQueryGenerator = function*(someParams offset, limit) {
let hasMore = true
while(hasMore) {
const results = yield YOUR_PROMISE_BASED_REQUEST(someParams, limit, offset)
hasMore = results && results.nextpage !== null
offset += limit
}
}
return ogen(paginatedQueryGenerator)(someParams, offset, limit)
}
Еще одно решение - использовать retryWhen
getAllData() {
let page = 0;
let result = [];
const getOnePage = () => {
return of(0).pipe(mergeMap(() => getPaginatedData(page++)));
};
return getOnePage()
.pipe(
map(items => {
result = result.concat(items);
if (templates.length === PAGE_SIZE) {
throw 'get next page';
}
}),
retryWhen(e => e.pipe(
takeWhile(er => er === 'get next page'))
),
map(e => result)
)
.subscribe(items => {
console.log('here is all data', items);
});
}