Как обработать RxJS потоком n элементов за раз, и как только элемент будет завершен, снова автоматически заполните n?

У меня есть поток событий, и я хотел бы вызвать функцию, которая возвращает обещание для каждого из этих событий, проблема в том, что эта функция очень дорогая, поэтому я хотел бы обрабатывать не более n событий одновременно.

Эта диаграмма гальки, вероятно, неверна, но это то, что я хотел бы:

---x--x--xxxxxxx-------------x------------->  //Events
---p--p--pppp------p-p-p-----p------------->  //In Progress
-------d--d--------d-d-dd------dddd-------->  //Promise Done

---1--21-2-34-----------3----4-3210--------   //QUEUE SIZE CHANGES

Это код, который я до сих пор:

var n = 4;
var inProgressCount = 0;

var events$ = Rx.Observable.fromEvent(produceEvent, 'click')
  .map((ev) => new Date().getTime());

var inProgress$ = events$.controlled();

var done$ = inProgress$      
  .tap(() => inProgressCount++)
  .flatMap((timestamp) => Rx.Observable.fromPromise(expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)));

done$.subscribeOnNext((timestamp) => {
  inProgressCount--;
  inProgress$.request(Math.max(1, n - inProgressCount));
});

inProgress$.request(n);

Есть два вопроса с этим кодом:

  1. Это использует inProgressCount var, который обновляется функциями побочных эффектов.
  2. Подписка done $ вызывается только один раз, когда я запрашиваю более 1 элемента из контролируемого потока. Это делает inProgressCount var для некорректного обновления, это в конечном итоге ограничивает очередь до одной за раз.

Вы можете увидеть, как это работает здесь: http://jsbin.com/wivehonifi/1/edit?js,console,output

Вопросы:

  1. Есть ли лучший подход?
  2. Как я могу избавиться от inProgressCount переменная?
  3. Почему подписка $ done вызывается только один раз при запросе нескольких товаров?

Обновить:
Ответ на вопрос № 3: switchMap такой же, как flatMapLatest, поэтому я получил только последний. Обновлен код для flatMap вместо switchMap.

1 ответ

Решение

Вам на самом деле не нужно использовать противодавление вообще. Есть оператор под названием flatMapWithMaxConcurrent это делает это для вас. По сути, это псевдоним для вызова .map().merge(concurrency) и это только позволяет максимальному количеству потоков быть в полете за один раз.

Я обновил ваш jsbin здесь: http://jsbin.com/weheyuceke/1/edit?js,output

Но я прокомментировал важный бит ниже:

const concurrency = 4;

var done$ = events$
  //Only allows a maximum number of items to be subscribed to at a time
  .flatMapWithMaxConcurrent(concurrency, 
    ({timestamp}) =>   
      //This overload of `fromPromise` defers the execution of the lambda
      //until subscription                    
      Rx.Observable.fromPromise(() => { 
        //Notify the ui that this task is in progress                                 
        updatePanelAppend(inProgress, timestamp);
        removeFromPanel(pending, timestamp);
        //return the task
        return expensiveComputation(getRandomInt(1, 5) * 1000, timestamp)
     }));
Другие вопросы по тегам