rxJava buffer() со временем, которое учитывает обратное давление
Версии buffer
Оператор, который не работает по времени, учитывает обратное давление согласно JavaDoc:
http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html
Однако любая версия buffer
который включает временные буферы, не поддерживает противодавление, как этот
http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html
Я понимаю, что это происходит из-за того, что, как только время идет, вы не можете остановить его так же, как, например, interval
оператор, который не поддерживает противодавление либо по той же причине.
Мне нужен оператор буферизации, основанный как на размере, так и на времени, и полностью поддерживающий противодавление, передавая сигналы противодавления ОБА вверх по течению и производителю, тикающему время, что-то вроде этого:
someFlowable()
.buffer(
Flowable.interval(1, SECONDS).onBackpressureDrop(),
10
);
Так что теперь я могу снять галочку на сигналах противодавления.
Это в настоящее время достижимо в rxJava2? Как насчет Project-Reactor?
1 ответ
I've encountered the problem recently and here is my implementation. It can be used like this:
Flowable<List<T>> bufferedFlow = (some flowable of T)
.compose(new BufferTransformer(1, TimeUnit.MILLISECONDS, 8))
It supports backpressure by the count you've specified.
Here is the implementation: https://gist.github.com/driventokill/c49f86fb0cc182994ef423a70e793a2d
У меня возникли проблемы с решением из /questions/36761947/rxjava-buffer-so-vremenem-kotoroe-uchityivaet-obratnoe-davlenie/55142622#55142622 при использованииDisposableSubscriber
как подписчики, и насколько я вижу этот преобразователь не учитывает звонки Suscription#request
от нижестоящих подписчиков (это может их переполнять). Создаю свою версию, протестированную в продакшене - BufferTransformerHonorableToBackpressure.java. Fang Yang - большое уважение к идее.
У меня был еще один вариант, который привел к довольно сверхмощному решению, которое, кажется, работает (ТМ)
Требования:
- Оператор буферизации, который освобождает буфер по истечении определенного промежутка времени, или когда буфер достигает максимального размера, в зависимости от того, что произойдет раньше
- Оператор должен иметь полное обратное давление, то есть, если запросы прекращаются из нисходящего потока, оператор буфера не должен отправлять данные и не должен вызывать каких-либо исключений (как это делает оператор starndard Flowable.buffer(interval, TimeUnit). Оператор не должен использовать его источник / вверх по течению в неограниченном режиме либо
- Сделайте это с составлением существующих / реализованных операторов.
Зачем кому-то этого хотеть?:
Потребность в таком операторе возникла, когда я захотел реализовать буферизацию в бесконечном / длинном потоке. Я хотел буферизовать для эффективности, но стандарт Flowable.buffer (n) здесь не подходит, так как "бесконечный" поток может испускать k Схема решения: Решение основано на Код: https://gist.github.com/artur-jablonski/5eb2bb470868d9eeeb3c9ee247110d4agenerateAsync
а также partialCollect
операторы, оба реализованы в проекте https://github.com/akarnokd/RxJava2Extensions. Остальное - стандарт RxJava2.C
merge
этот поток с потоком, который использует источник generateAsync
, Этот поток использует switchMap
испускать экземпляры C
это фактически сигналы тайм-аута. partialCollect
который содержит ссылку на объект "API" для передачи элементов в generateAsync
вверх по течению. Это своего рода петля обратной связи, которая идет от paritialCollect
через объект "API" для generateAsync
который питает partialCollect
, В этом случае partialCollect
может при получении первого элемента в буфере испустить сигнал, который будет эффективно начинать тайм-аут. Если буфер не заполняется до истечения времени ожидания, это вызовет экземпляр пустого C
(не содержащий никакого значения) возвращаясь в partialCollect
, Он обнаружит его как сигнал тайм-аута и освободит объединенный буфер в нисходящем направлении. Если буфер освобождается из-за достижения максимального размера, он будет освобожден, и следующий элемент будет запущен еще раз. Любой сигнал тайм-аута (экземпляр пустого C
) прибывающий поздно, то есть после освобождения буфера из-за достижения максимального размера, игнорируется. Это возможно, потому что это partialCollect
это создает и отправляет элемент сигнала тайм-аута, который потенциально может вернуться к нему. Проверка идентичности этого элемента позволяет обнаруживать сигнал позднего и законного времени ожидания.
Это было какое-то время, но я снова посмотрел на это, и почему-то мне показалось, что это:
public static <T> FlowableTransformer<T, List<T>> buffer(
int n, long period, TimeUnit unit)
{
return o ->
o.groupBy(__ -> 1)
.concatMapMaybe(
gf ->
gf.take(n)
.take(period, SECONDS)
.toList()
.filter(l -> !l.isEmpty())
);
}
в значительной степени делает то, что я описал. Это, если я правильно, полностью обратное давление и будет либо буферизировать n элементов, либо по истечении указанного времени, если достаточное количество элементов не было собрано