Reactor UnicastProcessor drop on backpressure when queue is full

I'm trying to use a UnicastProcessor to allow a certain number of events to be queued (to handle bursts of load and to be able to check the size of the queue). The problem is that the backpressure doesn't seem to be working as expected.

According to the docs backpressure is supported:

If that queue is bounded, the processor could reject the push of a value when the buffer is full and not enough requests from downstream have been received.

In that bounded case, you can also build the processor with a callback that is invoked on each rejected element, allowing for cleanup of these rejected elements.

Here is a toy example:

    Flux<Long> interval = Flux.interval(Duration.ZERO, Duration.ofMillis(100));
    BlockingQueue<Long> queue = new LinkedBlockingQueue<>(10);
    UnicastProcessor<Long> processor = UnicastProcessor.create(queue,
            i -> {
                // only invoked once
                log.error("Dropping in UnicastProcessor i={}, QueueSize={}", i, queue.size());
            }, () -> {

            }
    );
    // FluxSink<Long> sink = processor.sink(FluxSink.OverflowStrategy.DROP); // using a sink facade doesn't seem to help either

    interval
        .publishOn(Schedulers.newSingle("sending"))
        .doOnNext(i -> log.error("Sent i={}, QueueSize={}", i, queue.size()))
        .subscribe(processor);
        // .doOnNext(sink::next)
        // .subscribe();

    processor
        // Adding backpressure here makes the queue size always zero
        // .onBackpressureDrop(i -> log.error("Dropping after UnicastProcessor i={}, QueueSize={}", i, queue.size()))
        .publishOn(Schedulers.newSingle("receiving"))
        .doOnNext(i -> log.error("Received i={}, QueueSize={}", i, queue.size()))
        .doOnNext(i -> {
            try {
                sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        })
        .subscribe();

    sleep(20000); // Keep running long enough to see the exception

The ideal behavior is to let the queue fill up if the subscriber isn't quick enough and to drop anything else incoming.

Here are some logs (... indicates a similar log with incremented i values and the same QueueSize):

[sending-1] ERROR Test - Sent i=0, QueueSize=0  
[receiving-2] ERROR Test - Received i=0, QueueSize=0
[sending-1] ERROR Test - Sent i=1, QueueSize=0  
...
[sending-1] ERROR Test - Sent i=10, QueueSize=9 
[receiving-2] ERROR Test - Received i=1, QueueSize=9
[sending-1] ERROR Test - Sent i=11, QueueSize=9 
[sending-1] ERROR Test - Sent i=12, QueueSize=10
[sending-1] ERROR Test - Dropping in UnicastProcessor i=12, QueueSize=10
[sending-1] ERROR Test - Sent i=13, QueueSize=10
...
[sending-1] ERROR Test - Sent i=20, QueueSize=10
[receiving-2] ERROR Test - Received i=2, QueueSize=9
[sending-1] ERROR Test - Sent i=21, QueueSize=9
[sending-1] ERROR Test - Sent i=22, QueueSize=9
...
[receiving-2] ERROR Test - Received i=10, QueueSize=1
[sending-1] ERROR Test - Sent i=101, QueueSize=1
...
[sending-1] ERROR Test - Sent i=110, QueueSize=1
[receiving-2] ERROR Test - Received i=11, QueueSize=0
[sending-1] ERROR Test - Sent i=111, QueueSize=0
...
[sending-1] ERROR Test - Sent i=120, QueueSize=0
[receiving-2] ERROR reactor.core.scheduler.Schedulers - Scheduler worker in group main failed with an uncaught exception
    reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
    Caused by: reactor.core.Exceptions$OverflowException: The receiver is overrun by more signals than expected (bounded queue...)
        at reactor.core.Exceptions.failWithOverflow(Exceptions.java:202) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.publisher.UnicastProcessor.onNext(UnicastProcessor.java:373) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:204) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484) ~[reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84) [reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37) [reactor-core-3.2.10.RELEASE.jar:3.2.10.RELEASE]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_232]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_232]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_232]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_232]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_232]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_232]
[sending-1] ERROR Test - Sent i=121, QueueSize=0
[sending-1] ERROR Test - Sent i=122, QueueSize=0
...

Perhaps .subscribe(processor) on the interval doesn't implement back pressure?

What I'm really trying to use this for is queuing incoming packets from Reactor Netty so I tagged this question with it in case anyone has been trying to do something similar.

0 ответов

Другие вопросы по тегам