rxJava buffer() со временем, которое учитывает обратное давление

Версии buffer Оператор, который не работает по времени, учитывает обратное давление согласно JavaDoc:

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html

Однако любая версия buffer который включает временные буферы, не поддерживает противодавление, как этот

http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html

Я понимаю, что это происходит из-за того, что, как только время идет, вы не можете остановить его так же, как, например, interval оператор, который не поддерживает противодавление либо по той же причине.

Мне нужен оператор буферизации, основанный как на размере, так и на времени, и полностью поддерживающий противодавление, передавая сигналы противодавления ОБА вверх по течению и производителю, тикающему время, что-то вроде этого:

someFlowable() .buffer( Flowable.interval(1, SECONDS).onBackpressureDrop(), 10 );

Так что теперь я могу снять галочку на сигналах противодавления.

Это в настоящее время достижимо в rxJava2? Как насчет Project-Reactor?

1 ответ

I've encountered the problem recently and here is my implementation. It can be used like this:

    Flowable<List<T>> bufferedFlow = (some flowable of T)
                              .compose(new BufferTransformer(1, TimeUnit.MILLISECONDS, 8))

It supports backpressure by the count you've specified.

Here is the implementation: https://gist.github.com/driventokill/c49f86fb0cc182994ef423a70e793a2d

У меня возникли проблемы с решением из /questions/36761947/rxjava-buffer-so-vremenem-kotoroe-uchityivaet-obratnoe-davlenie/55142622#55142622 при использованииDisposableSubscriber как подписчики, и насколько я вижу этот преобразователь не учитывает звонки Suscription#requestот нижестоящих подписчиков (это может их переполнять). Создаю свою версию, протестированную в продакшене - BufferTransformerHonorableToBackpressure.java. Fang Yang - большое уважение к идее.

У меня был еще один вариант, который привел к довольно сверхмощному решению, которое, кажется, работает (ТМ)

Требования:

  1. Оператор буферизации, который освобождает буфер по истечении определенного промежутка времени, или когда буфер достигает максимального размера, в зависимости от того, что произойдет раньше
  2. Оператор должен иметь полное обратное давление, то есть, если запросы прекращаются из нисходящего потока, оператор буфера не должен отправлять данные и не должен вызывать каких-либо исключений (как это делает оператор starndard Flowable.buffer(interval, TimeUnit). Оператор не должен использовать его источник / вверх по течению в неограниченном режиме либо
  3. Сделайте это с составлением существующих / реализованных операторов.

Зачем кому-то этого хотеть?:

Потребность в таком операторе возникла, когда я захотел реализовать буферизацию в бесконечном / длинном потоке. Я хотел буферизовать для эффективности, но стандарт Flowable.buffer (n) здесь не подходит, так как "бесконечный" поток может испускать k

Схема решения:

Решение основано на generateAsync а также partialCollect операторы, оба реализованы в проекте https://github.com/akarnokd/RxJava2Extensions. Остальное - стандарт RxJava2.

  1. Сначала оберните все значения из апстрима в классе контейнера C
  2. затем merge этот поток с потоком, который использует источник generateAsync, Этот поток использует switchMap испускать экземпляры C это фактически сигналы тайм-аута.
  3. Два объединенных потока текут в partialCollect который содержит ссылку на объект "API" для передачи элементов в generateAsync вверх по течению. Это своего рода петля обратной связи, которая идет от paritialCollect через объект "API" для generateAsync который питает partialCollect, В этом случае partialCollect может при получении первого элемента в буфере испустить сигнал, который будет эффективно начинать тайм-аут. Если буфер не заполняется до истечения времени ожидания, это вызовет экземпляр пустого C (не содержащий никакого значения) возвращаясь в partialCollect, Он обнаружит его как сигнал тайм-аута и освободит объединенный буфер в нисходящем направлении. Если буфер освобождается из-за достижения максимального размера, он будет освобожден, и следующий элемент будет запущен еще раз. Любой сигнал тайм-аута (экземпляр пустого C) прибывающий поздно, то есть после освобождения буфера из-за достижения максимального размера, игнорируется. Это возможно, потому что это partialCollect это создает и отправляет элемент сигнала тайм-аута, который потенциально может вернуться к нему. Проверка идентичности этого элемента позволяет обнаруживать сигнал позднего и законного времени ожидания.

Код: https://gist.github.com/artur-jablonski/5eb2bb470868d9eeeb3c9ee247110d4a

Это было какое-то время, но я снова посмотрел на это, и почему-то мне показалось, что это:

public static <T> FlowableTransformer<T, List<T>> buffer(
    int n, long period, TimeUnit unit)
{
    return o ->
        o.groupBy(__ -> 1)
         .concatMapMaybe(
             gf ->
                 gf.take(n)
                   .take(period, SECONDS)
                   .toList()
                   .filter(l -> !l.isEmpty())
         );
}

в значительной степени делает то, что я описал. Это, если я правильно, полностью обратное давление и будет либо буферизировать n элементов, либо по истечении указанного времени, если достаточное количество элементов не было собрано

Другие вопросы по тегам