RXJS расширить оператор

var offset = 1;
var limit = 500;

var list = new Promise(function (resolve, reject) {
  rets.getAutoLogoutClient(config.clientSettings, (client) => {
    var results = client.search.query(SearchType, Class, Query, {
      limit: limit,
      offset: offset
    });
    resolve(results);
  });
});

var source = Rx.Observable.fromPromise(list);

source.subscribe(results => console.log(results.count));

Я делаю сайт по недвижимости, используя RETS.

То, что я пытаюсь сделать, мой запрос ограничен сервером RETS, это запустить его в цикле, увеличивая смещение до тех пор, пока у меня не будут все мои данные. Я не знаю, что это за счет, пока я не выполню запрос и не найду значение.

Я пытался использовать расширение, но я понятия не имею, как именно это работает. Попытался сделать это несколькими способами, даже используя старомодный цикл while, который пока не работает с методом.then. Так что я обратился к RXJS, так как я использовал его в Angular 4.

Это сделано в экспресс. Мне нужно в конечном итоге запустить задания кукурузы, чтобы получить обновленные свойства, но моя проблема заключается в получении всех данных и увеличении смещения каждый раз, если число превышает мое смещение. Так, например, запустите запрос со смещением 1 с пределом 500. Итого здесь 1690. Итак, следующий обход моего смещения будет:

offset += limit

Как только у меня есть данные, мне нужно сохранить их в MongoDB. Что я уже смог успешно сделать. Он просто находит способ получить все данные без необходимости вручную устанавливать смещение.

Обратите внимание, что ограничение сервера составляет 2500, да, я могу получить все это за один раз, но есть и другие данные, такие как медиа, которые могут иметь более 2500.

Какие-либо предложения?

2 ответа

На самом деле это довольно распространенный вариант использования RxJS, так как существует множество разбитых на страницы источников данных или источников, которые иначе ограничены в том, что вы можете запросить за один раз.

Мои два цента

На мой взгляд expand вероятно, лучший оператор для этого, учитывая, что вы разбиваете на страницы на неизвестном источнике данных, и вам требуется по крайней мере один запрос для определения окончательного числа. Если бы вы знали, сколько данных вы собираетесь запрашивать, более простым вариантом было бы использовать что-то вроде mergeScanНо я отвлекся.

Предложенное решение

Это может занять немного усилий, чтобы обернуть вашу голову, поэтому я добавил аннотации, где это возможно, чтобы понять, как все это работает. Обратите внимание, что я на самом деле не проверял это, так что извините за любые синтаксические ошибки.

// Your constant limit for any one query
const limit = 500;

// RxJS helper method that wraps the async call into an Observable
// I am basing this on what I saw of your sample which leads me to believe
// that this should work. 
const clientish = Rx.Observable.bindCallback(rets.getAutoLogoutClient);

// A method wrapper around your query call that wraps the resulting promise
// into a defer.
const queryish = (client, params) =>
  // Note the use of defer here is deliberate, since the query returns
  // a promise that will begin executing immediately, this prevents that behavior
  // And forces execution on subscription.
  Rx.Observable.defer(() => client.search.query(SearchType, Class, Query, params));

// This does the actual expansion function
// Note this is a higher order function because the client and the parameters
// are available at different times
const expander = (client) => ({limit, count}) => 
  // Invoke the query method
  queryish(client, {limit, count})
    // Remap the results, update offset and count and forward the whole 
    // package down stream
    .map(results => ({
      limit, 
      count: results.count, 
      offset: offset + limit, 
      results
    }));


// Start the stream by constructing the client
clientish(config.clientSettings)
  .switchMap(client =>
     // This are the arguments for the initial call
     Rx.Observable.of({limit, offset: 0})
       // Call the expander function with the client
       // The second argument is the max concurrency, you can change that if needed
       .expand(expander(client), 1)

       // Expand will keep recursing unless you tell it to stop
       // This will halt the execution once offset exceeds count, i.e. you have
       // all the data
       .takeWhile(({count, offset}) => offset < count)

       // Further downstream you only care about the results
       // So extract them from the message body and only forward them
       .pluck('results')
  )
  .subscribe(results => /*Do stuff with results*/);
const retsConnect = Rx.Observable.create(function(observer) {
  rets.getAutoLogoutClient(config.clientSettings, client => {
    return searchQuery(client, 500, 1, observer);
  });
});

function searchQuery(client, limit, offset, observer) {
  let currentOffset = offset === undefined || offset === 0 ? 1 : offset;
  return client.search.query(SearchType, Class, Query, {limit: limit, offset: currentOffset})
    .then(results => {
      offset += limit;
      observer.next(results.maxRowsExceeded);
      if (results.maxRowsExceeded) {
        console.log(offset);
        return searchQuery(client, limit, offset, observer);
      } else {
        console.log('Completed');
        observer.complete();
      }
    });
}

retsConnect.subscribe(val => console.log(val));

Так что это где-то с тем, что я пытался здесь. Я все еще в процессе настройки этого. Итак, что я хочу сделать, это сломать searchQuery больше. Не уверен, стоит ли мне передавать туда наблюдатель, так что я собираюсь выяснить, где сопоставить, и взять до тех пор, пока не вернем searchQuery. Я не уверен, что takeUntil примет истину или ложь, хотя. Все, что мне нужно, это сохранить данные в mongodb. Так что, думаю, я мог бы оставить все как есть и добавить туда свой метод сохранения, но я все еще хотел бы выяснить это.

Примечание: results.maxRowsExceeded возвращает true, когда еще есть данные. Поэтому, как только maxRows вернет false, он остановится и все данные будут получены.

Другие вопросы по тегам