Неподдерживаемая операция в потоках восстановления Spring-Data-Redis
Я использую реактивный код spring data redis для прослушивания входящих сообщений из потока redis. Иногда я получаю следующее исключение, и я не уверен, когда это произойдет. Когда я копаюсь в исходном коде класса DefaultStreamReceiver spring-data-redis-2.3.2.RELEASE, в строке 360 есть оператор overflow.add(message), который в основном добавляет к SbscArrayQueue реактивной библиотеки, метод добавления которой просто выбрасывает Исключение неподдерживаемой операции.
2020-11-09T19:44:56.438+0000 [elastic-126] [ERROR] reactor.util.Loggers$Slf4JLogger - Scheduler worker in group main failed with an uncaught exception
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.UnsupportedOperationException
Caused by: java.lang.UnsupportedOperationException
at reactor.util.concurrent.SpscArrayQueue.add(SpscArrayQueue.java:152) ~[reactor-core-3.3.8.RELEASE.jar!/:3.3.8.RELEASE]
at org.springframework.data.redis.stream.DefaultStreamReceiver$StreamSubscription.onStreamMessage(DefaultStreamReceiver.java:360) ~[spring-data-redis-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]
at org.springframework.data.redis.stream.DefaultStreamReceiver$StreamSubscription.access$100(DefaultStreamReceiver.java:186) ~[spring-data-redis-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]
at org.springframework.data.redis.stream.DefaultStreamReceiver$StreamSubscription$1.onNext(DefaultStreamReceiver.java:303) ~[spring-data-redis-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]
at org.springframework.data.redis.stream.DefaultStreamReceiver$StreamSubscription$1.onNext(DefaultStreamReceiver.java:294) ~[spring-data-redis-2.3.2.RELEASE.jar!/:2.3.2.RELEASE]
at reactor.core.publisher.FluxUsingWhen$UsingWhenSubscriber.onNext(FluxUsingWhen.java:355) ~[reactor-core-3.3.8.RELEASE.jar!/:3.3.8.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.8.RELEASE.jar!/:3.3.8.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.8.RELEASE.jar!/:3.3.8.RELEASE]
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) ~[reactor-core-3.3.8.RELEASE.jar!/:3.3.8.RELEASE]
at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onNext(MonoFlatMapMany.java:242) ~[reactor-core-3.3.8.RELEASE.jar!/:3.3.8.RELEASE]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:114) ~[reactor-core-3.3.8.RELEASE.jar!/:3.3.8.RELEASE]
at io.lettuce.core.RedisPublisher$ImmediateSubscriber.onNext(RedisPublisher.java:917) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.lettuce.core.RedisPublisher$RedisSubscription.onNext(RedisPublisher.java:290) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.lettuce.core.output.StreamingOutput$Subscriber.onNext(StreamingOutput.java:64) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.lettuce.core.output.StreamReadOutput.complete(StreamReadOutput.java:95) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.lettuce.core.protocol.RedisStateMachine.doDecode(RedisStateMachine.java:176) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.lettuce.core.protocol.RedisStateMachine.decode(RedisStateMachine.java:142) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:742) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.decode0(CommandHandler.java:706) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:701) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.decode(CommandHandler.java:622) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.lettuce.core.protocol.CommandHandler.channelRead(CommandHandler.java:591) ~[lettuce-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1526) ~[netty-handler-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1275) ~[netty-handler-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1322) ~[netty-handler-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:501) ~[netty-codec-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:440) ~[netty-codec-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:276) ~[netty-codec-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:163) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:714) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) ~[netty-transport-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) ~[netty-common-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.51.Final.jar!/:4.1.51.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.51.Final.jar!/:4.1.51.Final]
at java.lang.Thread.run(Thread.java:834) [?:?]
Мне это определенно кажется весенним багом. Есть идеи, почему это происходит?