Как контролировать давление множественных вызовов ajax с RxJS

Используя RxJS 5, я хочу решить следующее:

Допустим, я получаю список категорий видов из REST API.

Основываясь на каждой из этих категорий, я хочу получить подкатегории из другой конечной точки REST.

Затем на основе каждой из этих подкатегорий я хочу получить продукты, а для каждого из этих продуктов нам нужно получить подробное описание.

Это я решил. Проблема заключается в том, что количество вызовов ajax возрастает, и менее чем за минуту выполняется более 30 000 вызовов, что приводит к тому, что сервер становится на колени.

Теперь, так как это ночная работа, я согласен, что она занимает некоторое время, пока она успешно завершается.

Вот что у меня есть:

getCategories() // Wraps ajax call and returns payload with array of categories
      .switchMap(categories => Observable.from(categories))
      .mergeMap(category => 
        getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
          .catch(err => {
            console.error('Error when fetching sub categories for category:', category);
            console.error(err);
            return Observable.empty();
          })
      )
      .mergeMap(subCategories => Observable.from(subCategories))
      .mergeMap(subCategory => 
        getProducts(subCategory) // Wraps ajax call and returns payload with array of products
        .catch(err => {
          console.error('Error when fetching products for sub category:', subCategory);
          console.error(err);
          return Observable.empty();
        })
      )
      .mergeMap(products => Observable.from(products))
      .mergeMap(product => 
        getProductDetails(product) // Wraps ajax call and returns payload with product details
        .catch(err => {
          console.error('Error when fetching product details for product:', product);
          console.error(err);
          return Observable.empty();
        })
      )
      .mergeMap(productDetails => saveToDb(productDetails))
      .catch(err => {
        console.error(err);
      })
      .subscribe();

После первоначального запроса, где я выбираю категории, я хочу:

Каждый вызов для получения подкатегорий должен ждать до завершения предыдущего. Только 5 вызовов ajax должны быть сделаны одновременно при получении продуктов и деталей этих продуктов. После того, как эти 5 звонков сделаны, мы запускаем следующие 5 звонков и т. Д.

В качестве альтернативы это может контролироваться со временем, как в ожидании х секунд, прежде чем мы сделаем следующий вызов AJAX и т.д.

Как мне решить эту проблему с помощью RxJS, основываясь на моем примере выше?

1 ответ

Решение

mergeMap имеет перегрузку, которая принимает необязательный параметр параллелизма. Это вы можете использовать, чтобы контролировать количество одновременных запросов на ваш сервер.

public mergeMap(project: function(value: T, ?index: number): ObservableInput, resultSelector: function(outerValue: T, innerValue: I, outerIndex: number, innerIndex: number): any, concurrent: number): Observable

  .mergeMap(category => 
    getSubCategories(category) // Wraps ajax call and returns payload with array of sub categories
      .catch(err => {
        console.error('Error when fetching sub categories for category:', category);
        console.error(err);
        return Observable.empty();
      }),
    null, 
    5 /* concurrency */
  )

Также; mergeMap преобразует что-либо observableLike в Observables, так что если ваш getSubCategories() возвращает массив, который будет автоматически преобразован в наблюдаемый, не более Observable.from() нужно.

Другие вопросы по тегам