Экспоненциальный откат с гарантией заказа сообщения с использованием spring-kafka

Я пытаюсь реализовать потребителя Spring Boot на основе Kafka, который имеет некоторые очень надежные гарантии доставки сообщений, даже в случае ошибки.

  • сообщения из раздела должны обрабатываться по порядку,
  • если обработка сообщения не удалась, использование определенного раздела должно быть приостановлено,
  • обработка должна быть повторена с отсрочкой, пока она не будет успешной.

Наша текущая реализация удовлетворяет этим требованиям:

  @Bean
  public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRetryTemplate(retryTemplate());

    final ContainerProperties containerProperties = factory.getContainerProperties();
    containerProperties.setAckMode(AckMode.MANUAL_IMMEDIATE);
    containerProperties.setErrorHandler(errorHandler());

    return factory;
  }

  @Bean
  public RetryTemplate retryTemplate() {

    final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(1000);
    backOffPolicy.setMultiplier(1.5);

    final RetryTemplate template = new RetryTemplate();
    template.setRetryPolicy(new AlwaysRetryPolicy());    
    template.setBackOffPolicy(backOffPolicy);

    return template;
  }

  @Bean
  public ErrorHandler errorHandler() {
    return new SeekToCurrentErrorHandler();
  }

Тем не менее, здесь, запись заблокирована потребителем навсегда. В какой-то момент время обработки превысит max.poll.interval.ms и сервер переназначит раздел другому потребителю, создавая дубликат.

Если предположить, max.poll.interval.ms равный 5 минутам (по умолчанию) и сбоям, продолжающимся 30 минут, это приведет к тому, что сообщение будет обработано ок. 6 раз

Другая возможность - вернуть сообщения в очередь после N попыток (например, 3 попытки), используя SimpleRetryPolicy, Затем сообщение будет воспроизведено (спасибо SeekToCurrentErrorHandler) и обработка начнется с нуля, опять же до 5 попыток. Это приводит к задержкам, образующим серию, например

10 secs -> 30 secs -> 90 secs -> 10 secs -> 30 secs -> 90 secs -> ...

что менее желательно, чем постоянно растущий:)

Есть ли третий сценарий, который мог бы удерживать задержки, формирующие восходящий ряд, и в то же время не создавать дубликаты в вышеупомянутом примере?

1 ответ

Решение

Это может быть сделано с повторением с отслеживанием состояния - в этом случае исключение выдается после каждой повторной попытки, но состояние сохраняется в объекте состояния повторной попытки, поэтому при следующей доставке этого сообщения будет использоваться следующая задержка и т. Д.

Для этого требуется что-то в сообщении (например, заголовок), чтобы однозначно идентифицировать каждое сообщение. К счастью, с Kafka тема, раздел и смещение предоставляют этот уникальный ключ для государства.

Однако в настоящее время RetryingMessageListenerAdapter не поддерживает повторение с сохранением состояния.

Вы можете отключить повтор в фабрике контейнера слушателя и использовать состояние с сохранением RetryTemplate в вашем слушателе, используя один из execute методы, которые принимают RetryState аргумент.

Не стесняйтесь добавлять проблему GitHub для платформы для поддержки повторения с сохранением состояния; взносы приветствуются! - выдан запрос на извлечение.

РЕДАКТИРОВАТЬ

Я только что написал тестовый пример, чтобы продемонстрировать использование восстановления с сохранением состояния с @KafkaListener...

/*
 * Copyright 2018 the original author or authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *      http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.springframework.kafka.annotation;

import static org.assertj.core.api.Assertions.assertThat;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.SeekToCurrentErrorHandler;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.kafka.test.rule.KafkaEmbedded;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.retry.RetryState;
import org.springframework.retry.backoff.ExponentialBackOffPolicy;
import org.springframework.retry.support.DefaultRetryState;
import org.springframework.retry.support.RetryTemplate;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

/**
 * @author Gary Russell
 * @since 5.0
 *
 */
@RunWith(SpringRunner.class)
@DirtiesContext
public class StatefulRetryTests {

    private static final String DEFAULT_TEST_GROUP_ID = "statefulRetry";

    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, 1, "sr1");

    @Autowired
    private Config config;

    @Autowired
    private KafkaTemplate<Integer, String> template;

    @Test
    public void testStatefulRetry() throws Exception {
        this.template.send("sr1", "foo");
        assertThat(this.config.listener1().latch1.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.config.listener1().latch2.await(10, TimeUnit.SECONDS)).isTrue();
        assertThat(this.config.listener1().result).isTrue();
    }

    @Configuration
    @EnableKafka
    public static class Config {

        @Bean
        public KafkaListenerContainerFactory<?> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                    new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            factory.getContainerProperties().setErrorHandler(new SeekToCurrentErrorHandler());
            return factory;
        }

        @Bean
        public DefaultKafkaConsumerFactory<Integer, String> consumerFactory() {
            return new DefaultKafkaConsumerFactory<>(consumerConfigs());
        }

        @Bean
        public Map<String, Object> consumerConfigs() {
            Map<String, Object> consumerProps =
                    KafkaTestUtils.consumerProps(DEFAULT_TEST_GROUP_ID, "false", embeddedKafka);
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            return consumerProps;
        }

        @Bean
        public KafkaTemplate<Integer, String> template() {
            KafkaTemplate<Integer, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
            return kafkaTemplate;
        }

        @Bean
        public ProducerFactory<Integer, String> producerFactory() {
            return new DefaultKafkaProducerFactory<>(producerConfigs());
        }

        @Bean
        public Map<String, Object> producerConfigs() {
            return KafkaTestUtils.producerProps(embeddedKafka);
        }

        @Bean
        public Listener listener1() {
            return new Listener();
        }

    }

    public static class Listener {

        private static final RetryTemplate retryTemplate = new RetryTemplate();

        private static final ConcurrentMap<String, RetryState> states = new ConcurrentHashMap<>();

        static {
            ExponentialBackOffPolicy backOff = new ExponentialBackOffPolicy();
            retryTemplate.setBackOffPolicy(backOff);
        }

        private final CountDownLatch latch1 = new CountDownLatch(3);

        private final CountDownLatch latch2 = new CountDownLatch(1);

        private volatile boolean result;

        @KafkaListener(topics = "sr1", groupId = "sr1")
        public void listen1(final String in, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                @Header(KafkaHeaders.OFFSET) long offset) {
            String recordKey = topic + partition + offset;
            RetryState retryState = states.get(recordKey);
            if (retryState == null) {
                retryState = new DefaultRetryState(recordKey);
                states.put(recordKey, retryState);
            }
            this.result = retryTemplate.execute(c -> {

                // do your work here

                this.latch1.countDown();
                throw new RuntimeException("retry");
            }, c -> {
                latch2.countDown();
                return true;
            }, retryState);
            states.remove(recordKey);
        }

    }

}

а также

Seek to current after exception; nested exception is org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method 'public void org.springframework.kafka.annotation.StatefulRetryTests$Listener.listen1(java.lang.String,java.lang.String,int,long)' threw exception; nested exception is java.lang.RuntimeException: retry

после каждой попытки доставки.

В этом случае я добавил средство восстановления для обработки сообщения после того, как повторные попытки исчерпаны. Вы можете сделать что-то еще, например, остановить контейнер (но сделать это в отдельном потоке, как мы делаем в ContainerStoppingErrorHandler).

Другие вопросы по тегам