Управление Quarkus BackPression

Я получил следующую трассировку стека, используя реактивный обмен сообщениями quarkus с kafka:

        at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
        at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
        at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
        at java.lang.Thread.run(Thread.java:748)
        at org.jboss.threads.JBossThread.run(JBossThread.java:479)


//very very long sql function call

2020-08-04 12:05:35,739 ERROR [io.sma.rea.mes.kafka] (executor-thread-78) SRMSG18207: Unable to dispatch message to Kafka: io.smallrye.mutiny.subscription.BackPressureFailure: Buffer is full due to lack of downstream consumption
        at io.smallrye.mutiny.operators.multi.overflow.MultiOnOverflowBufferOp$OnOverflowBufferProcessor.onItem(MultiOnOverflowBufferOp.java:80)
        at io.smallrye.mutiny.subscription.MultiSubscriber.onNext(MultiSubscriber.java:61)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.lambda$onNext$1(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.context.SmallRyeThreadContext.lambda$withContext$0(SmallRyeThreadContext.java:217)
        at io.smallrye.mutiny.context.ContextPropagationMultiInterceptor$1.onNext(ContextPropagationMultiInterceptor.java:36)
        at io.smallrye.mutiny.helpers.HalfSerializer.onNext(HalfSerializer.java:31)
        at io.smallrye.mutiny.helpers.StrictMultiSubscriber.onItem(StrictMultiSubscriber.java:81)
        at io.smallrye.mutiny.operators.multi.builders.IntervalMulti$IntervalRunnable.run(IntervalMulti.java:77)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at org.jboss.threads.ContextClassLoaderSavingRunnable.run(ContextClassLoaderSavingRunnable.java:35)
        at org.jboss.threads.EnhancedQueueExecutor.safeRun(EnhancedQueueExecutor.java:2046)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.doRunTask(EnhancedQueueExecutor.java:1578)
        at org.jboss.threads.EnhancedQueueExecutor$ThreadBody.run(EnhancedQueueExecutor.java:1452)
        at org.jboss.threads.DelegatingRunnable.run(DelegatingRunnable.java:29)
        at org.jboss.threads.ThreadLocalResettingRunnable.run(ThreadLocalResettingRunnable.java:29)
        at java.lang.Thread.run(Thread.java:748)
        at org.jboss.threads.JBossThread.run(JBossThread.java:479)

Вот мой код:

@Outgoing( "eqs-crossing-mid" )
    public Multi< EQSAlert > eqsCrossingTGV_MID(){

        final String series = CrossingEnum.TGV_MID.getSeries();
        final String equipment = CrossingEnum.TGV_MID.getEquipment();

        final String vehicleRegex = vehicleRegexService.getRegexBySeries( series );

        log.info( "Incoming request for {} - {}",  series, equipment);
        log.info( "Vehicle regex : {}", vehicleRegex );

        return Multi
                .createFrom()
                .ticks()
                .every(
                        Duration.ofSeconds( poolingInterval )
                )
                .onOverflow()
                .buffer(10)
                .concatMap(i -> {
                    final Multi<CrossingState> crossingStateBySeriesAndEquipment = CrossingState.getCrossingStateBySeriesAndEquipment(client, series, equipment);

                    return crossingStateBySeriesAndEquipment.flatMap(crossingState ->
                            crossingState.isActive() ?
                                    EQSAlert.getEQSAlertBySeriesAndEquipment(
                                            client,
                                            series,
                                            vehicleRegex,
                                            equipment
                                    )
                                    :
                                    Multi.createFrom().empty()
                    );
                });
    }

Как вы можете видеть, я выполняю вызов функции sql каждые 5 секунд (интервал объединения). Я заметил, что получил эту ошибку после того, как совершил ошибочную версию моей функции sql. Обычно время вызова составляет менее 1 секунды, и там функции составляют до 2 минут для отправки ответа.

Итак, я исправил свою функцию sql, и теперь все в порядке.

Цель моего поста - понять, что случилось с приложением? Это потому, что у меня слишком большой стек вызовов sql?

Спасибо

0 ответов

Другие вопросы по тегам