NPE, использующий тему Kafka
Я получил устрашающий NPE, использующий тему Kafka с использованием Quarkus 1.6.1 и среды реактивного обмена сообщениями Smallrye. Интересно то, что это не происходит при низкой скорости передачи сообщений (~100 сообщений / сек), а, кажется, происходит при более высокой скорости передачи сообщений +300 сообщений / сек. Тестовая среда - это простой производитель сообщений Quarkus Kafka и потребитель, подключенный к одной теме и использующий реактивные аннотации (@Outgoing,@Incoming). Ошибка возникает, когда я загружаю тестовое приложение (mvnw compile quarkus:dev). После нескольких повторных попыток приложение начинает работать так, как должно было быть.
Пожалуйста, проверьте bean-компонент ниже, использующий записи Kafka.
@Incoming("targets")
@Outgoing("parameters-map")
public Map<String, Object> processTarget(KafkaRecord<String, String> target) {
return this.deserializer.deserializeTarget(target.getKey(), target.getPayload());
}
OpenJDK 11
12:23:53 ERROR [io.sm.re.me.provider] (RxComputationThreadPool-1) SRMSG00201: Error caught during the stream processing: io.smallrye.mutiny.CompositeException: Multiple exceptions caught:
[Exception 0] java.lang.NullPointerException
[Exception 1] io.smallrye.reactive.messaging.ProcessingException: SRMSG00103: Exception thrown when calling the method xxx.was.ingestion.indexer.SolrSink#processTarget
at io.smallrye.mutiny.operators.UniOnItemOrFailureFlatMap.invokeAndSubstitute(UniOnItemOrFailureFlatMap.java:34)
at io.smallrye.mutiny.operators.UniOnItemOrFailureFlatMap$1.onFailure(UniOnItemOrFailureFlatMap.java:62)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onFailure$2(ContextPropagationUniInterceptor.java:40)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onFailure(ContextPropagationUniInterceptor.java:40)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.onFailure(UniSerializedSubscriber.java:85)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onFailure$2(ContextPropagationUniInterceptor.java:40)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onFailure(ContextPropagationUniInterceptor.java:40)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.onFailure(UniSerializedSubscriber.java:85)
at io.smallrye.mutiny.operators.UniOnItemMap$1.onItem(UniOnItemMap.java:35)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.lambda$onItem$1(ContextPropagationUniInterceptor.java:35)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$1.onItem(ContextPropagationUniInterceptor.java:35)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.onItem(UniSerializedSubscriber.java:72)
at io.smallrye.mutiny.operators.UniCreateFromCompletionStage.lambda$forwardFromCompletionStage$1(UniCreateFromCompletionStage.java:30)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture.uniWhenCompleteStage(CompletableFuture.java:883)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:2251)
at java.base/java.util.concurrent.CompletableFuture.whenComplete(CompletableFuture.java:143)
at io.smallrye.mutiny.operators.UniCreateFromCompletionStage.forwardFromCompletionStage(UniCreateFromCompletionStage.java:22)
at io.smallrye.mutiny.operators.UniCreateFromCompletionStage.subscribing(UniCreateFromCompletionStage.java:50)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.operators.UniOnItemMap.subscribing(UniOnItemMap.java:19)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.operators.UniOnItemOrFailureFlatMap.subscribing(UniOnItemOrFailureFlatMap.java:48)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.lambda$subscribing$0(ContextPropagationUniInterceptor.java:51)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationUniInterceptor$2.subscribing(ContextPropagationUniInterceptor.java:51)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:43)
at io.smallrye.mutiny.operators.UniSerializedSubscriber.subscribe(UniSerializedSubscriber.java:38)
at io.smallrye.mutiny.groups.UniSubscribe.withSubscriber(UniSubscribe.java:49)
at io.smallrye.mutiny.operators.UniProduceMultiOnItem.subscribe(UniProduceMultiOnItem.java:36)
at io.smallrye.mutiny.operators.AbstractMulti.subscribe(AbstractMulti.java:43)
at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
at io.smallrye.mutiny.groups.MultiCreate$1.subscribe(MultiCreate.java:159)
at io.smallrye.mutiny.groups.MultiSubscribe.withSubscriber(MultiSubscribe.java:48)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.lambda$subscribe$0(ContextPropagationMultiInterceptor.java:58)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$2.subscribe(ContextPropagationMultiInterceptor.java:58)
at io.smallrye.mutiny.operators.multi.MultiFlatMapOp$FlatMapMainSubscriber.onItem(MultiFlatMapOp.java:191)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.subscription.SerializedSubscriber.onItem(SerializedSubscriber.java:69)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:87)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:87)
at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
at io.smallrye.mutiny.streams.utils.ConnectableProcessor.onNext(ConnectableProcessor.java:122)
at io.smallrye.mutiny.streams.utils.WrappedProcessor.onNext(WrappedProcessor.java:44)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
at io.smallrye.mutiny.subscription.SafeSubscriber.onNext(SafeSubscriber.java:87)
at io.reactivex.internal.util.HalfSerializer.onNext(HalfSerializer.java:45)
at io.reactivex.internal.subscribers.StrictSubscriber.onNext(StrictSubscriber.java:97)
at io.reactivex.internal.operators.flowable.FlowableMap$MapSubscriber.onNext(FlowableMap.java:68)
at io.reactivex.internal.operators.flowable.FlowableOnBackpressureDrop$BackpressureDropSubscriber.onNext(FlowableOnBackpressureDrop.java:84)
at io.reactivex.internal.operators.flowable.FlowableInterval$IntervalSubscriber.run(FlowableInterval.java:93)
at io.reactivex.internal.schedulers.ScheduledDirectPeriodicTask.run(ScheduledDirectPeriodicTask.java:38)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.runAndReset(FutureTask.java:305)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:305)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)