Оператор RxJava, который динамически буферизует элементы с обратным давлением и генерирует их пакетами

У меня есть Flowable, который испускает события, которые должны быть обработаны дорогой операцией, которая ожидает массивы элементов:

Flowable<T> src
void expensiveOp(List<T> batch)

Помимо использования постоянного окна, я хотел бы указать окно с максимальным количеством элементов, которое заполняется, когда нисходящий поток занят, а при заполнении просто обратные давления:

int maxSize = 1024
src.dynamicWindow(maxSize).subscribe(expensiveOp)

Поэтому размер окна не должен зависеть ни от времени, ни от элемента, а от противодавления. Буфер должен быть очищен, когда подписчик готов обработать следующий элемент.

Какой перегруженный метод мне не хватает?

Возможными расширениями будут параметр minSize и механизм повторных попыток, который повторяет попытку с увеличенным окном.

0 ответов

I've encountered the problem recently and here's my original answer: /questions/36761947/rxjava-buffer-so-vremenem-kotoroe-uchityivaet-obratnoe-davlenie/55142622#55142622

For a quick answer. My implementation to support backpressure with time and count based buffer: https://gist.github.com/driventokill/c49f86fb0cc182994ef423a70e793a2d

Другие вопросы по тегам