RXJS расширить оператор
var offset = 1;
var limit = 500;
var list = new Promise(function (resolve, reject) {
rets.getAutoLogoutClient(config.clientSettings, (client) => {
var results = client.search.query(SearchType, Class, Query, {
limit: limit,
offset: offset
});
resolve(results);
});
});
var source = Rx.Observable.fromPromise(list);
source.subscribe(results => console.log(results.count));
Я делаю сайт по недвижимости, используя RETS.
То, что я пытаюсь сделать, мой запрос ограничен сервером RETS, это запустить его в цикле, увеличивая смещение до тех пор, пока у меня не будут все мои данные. Я не знаю, что это за счет, пока я не выполню запрос и не найду значение.
Я пытался использовать расширение, но я понятия не имею, как именно это работает. Попытался сделать это несколькими способами, даже используя старомодный цикл while, который пока не работает с методом.then. Так что я обратился к RXJS, так как я использовал его в Angular 4.
Это сделано в экспресс. Мне нужно в конечном итоге запустить задания кукурузы, чтобы получить обновленные свойства, но моя проблема заключается в получении всех данных и увеличении смещения каждый раз, если число превышает мое смещение. Так, например, запустите запрос со смещением 1 с пределом 500. Итого здесь 1690. Итак, следующий обход моего смещения будет:
offset += limit
Как только у меня есть данные, мне нужно сохранить их в MongoDB. Что я уже смог успешно сделать. Он просто находит способ получить все данные без необходимости вручную устанавливать смещение.
Обратите внимание, что ограничение сервера составляет 2500, да, я могу получить все это за один раз, но есть и другие данные, такие как медиа, которые могут иметь более 2500.
Какие-либо предложения?
2 ответа
На самом деле это довольно распространенный вариант использования RxJS, так как существует множество разбитых на страницы источников данных или источников, которые иначе ограничены в том, что вы можете запросить за один раз.
Мои два цента
На мой взгляд expand
вероятно, лучший оператор для этого, учитывая, что вы разбиваете на страницы на неизвестном источнике данных, и вам требуется по крайней мере один запрос для определения окончательного числа. Если бы вы знали, сколько данных вы собираетесь запрашивать, более простым вариантом было бы использовать что-то вроде mergeScan
Но я отвлекся.
Предложенное решение
Это может занять немного усилий, чтобы обернуть вашу голову, поэтому я добавил аннотации, где это возможно, чтобы понять, как все это работает. Обратите внимание, что я на самом деле не проверял это, так что извините за любые синтаксические ошибки.
// Your constant limit for any one query
const limit = 500;
// RxJS helper method that wraps the async call into an Observable
// I am basing this on what I saw of your sample which leads me to believe
// that this should work.
const clientish = Rx.Observable.bindCallback(rets.getAutoLogoutClient);
// A method wrapper around your query call that wraps the resulting promise
// into a defer.
const queryish = (client, params) =>
// Note the use of defer here is deliberate, since the query returns
// a promise that will begin executing immediately, this prevents that behavior
// And forces execution on subscription.
Rx.Observable.defer(() => client.search.query(SearchType, Class, Query, params));
// This does the actual expansion function
// Note this is a higher order function because the client and the parameters
// are available at different times
const expander = (client) => ({limit, count}) =>
// Invoke the query method
queryish(client, {limit, count})
// Remap the results, update offset and count and forward the whole
// package down stream
.map(results => ({
limit,
count: results.count,
offset: offset + limit,
results
}));
// Start the stream by constructing the client
clientish(config.clientSettings)
.switchMap(client =>
// This are the arguments for the initial call
Rx.Observable.of({limit, offset: 0})
// Call the expander function with the client
// The second argument is the max concurrency, you can change that if needed
.expand(expander(client), 1)
// Expand will keep recursing unless you tell it to stop
// This will halt the execution once offset exceeds count, i.e. you have
// all the data
.takeWhile(({count, offset}) => offset < count)
// Further downstream you only care about the results
// So extract them from the message body and only forward them
.pluck('results')
)
.subscribe(results => /*Do stuff with results*/);
const retsConnect = Rx.Observable.create(function(observer) {
rets.getAutoLogoutClient(config.clientSettings, client => {
return searchQuery(client, 500, 1, observer);
});
});
function searchQuery(client, limit, offset, observer) {
let currentOffset = offset === undefined || offset === 0 ? 1 : offset;
return client.search.query(SearchType, Class, Query, {limit: limit, offset: currentOffset})
.then(results => {
offset += limit;
observer.next(results.maxRowsExceeded);
if (results.maxRowsExceeded) {
console.log(offset);
return searchQuery(client, limit, offset, observer);
} else {
console.log('Completed');
observer.complete();
}
});
}
retsConnect.subscribe(val => console.log(val));
Так что это где-то с тем, что я пытался здесь. Я все еще в процессе настройки этого. Итак, что я хочу сделать, это сломать searchQuery больше. Не уверен, стоит ли мне передавать туда наблюдатель, так что я собираюсь выяснить, где сопоставить, и взять до тех пор, пока не вернем searchQuery. Я не уверен, что takeUntil примет истину или ложь, хотя. Все, что мне нужно, это сохранить данные в mongodb. Так что, думаю, я мог бы оставить все как есть и добавить туда свой метод сохранения, но я все еще хотел бы выяснить это.
Примечание: results.maxRowsExceeded возвращает true, когда еще есть данные. Поэтому, как только maxRows вернет false, он остановится и все данные будут получены.