Оператор RxJava, который динамически буферизует элементы с обратным давлением и генерирует их пакетами
У меня есть Flowable, который испускает события, которые должны быть обработаны дорогой операцией, которая ожидает массивы элементов:
Flowable<T> src
void expensiveOp(List<T> batch)
Помимо использования постоянного окна, я хотел бы указать окно с максимальным количеством элементов, которое заполняется, когда нисходящий поток занят, а при заполнении просто обратные давления:
int maxSize = 1024
src.dynamicWindow(maxSize).subscribe(expensiveOp)
Поэтому размер окна не должен зависеть ни от времени, ни от элемента, а от противодавления. Буфер должен быть очищен, когда подписчик готов обработать следующий элемент.
Какой перегруженный метод мне не хватает?
Возможными расширениями будут параметр minSize и механизм повторных попыток, который повторяет попытку с увеличенным окном.
0 ответов
I've encountered the problem recently and here's my original answer: /questions/36761947/rxjava-buffer-so-vremenem-kotoroe-uchityivaet-obratnoe-davlenie/55142622#55142622
For a quick answer. My implementation to support backpressure with time and count based buffer: https://gist.github.com/driventokill/c49f86fb0cc182994ef423a70e793a2d