Spring Kafka Retry Logging

У меня есть требование использовать тему из kafka, выполнить некоторую работу с записями и создать другую тему с помощью spring-kafka 2.1.7. Другие требования требуют транзакций только для семантики, повторных попыток и обработки ошибок. При неудачной фиксации записи я Если необходимо выполнить 3 попытки, зарегистрируйте каждое из сообщений о повторных попытках, чтобы повторить тему, а в случае неудачи всех попыток отправьте запись в пустую тему. Я посмотрел на https://github.com/spring-projects/spring-kafka/issues/575 и там есть отличные детали по решению проблемы. С чем я борюсь, так это как регистрировать каждое сообщение о повторных попытках с такими деталями, как смещение потребителя, тема, которую он пытался зафиксировать, и т. Д. Есть ли способ получить их от обратного вызова? Приведенный ниже фрагмент retrylistener зарегистрирован в org.springframework.kafka.listener.LoggingErrorHandler, который в качестве свойства контейнера установлен для ConcurrentKafkaListenerContainerFactory?

         @Bean
         public RetryListener retryListener(KafkaTemplate<String,SpecificRecord> kafkaTemplate) {
             return new RetryListenerSupport() {

                public void onError(RetryContext context, RetryCallback callback, Throwable throwable) {
                    int retryCount =context.getRetryCount();
                    kafkaTemplate) .send(new ProducerRecord<String,SpecificRecord>("topic_name",record));
                }
             };
         }

1 ответ

Решение

RetryContext заполнен некоторой полезной информацией в RetryingMessageListenerAdapter:

context.setAttribute(CONTEXT_RECORD, record);
switch (RetryingMessageListenerAdapter.this.delegateType) {
    case ACKNOWLEDGING_CONSUMER_AWARE:
        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
        context.setAttribute(CONTEXT_CONSUMER, consumer);
        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
        break;
    case ACKNOWLEDGING:
        context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
        RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
        break;
    case CONSUMER_AWARE:
        context.setAttribute(CONTEXT_CONSUMER, consumer);
        RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
        break;
    case SIMPLE:
        RetryingMessageListenerAdapter.this.delegate.onMessage(record);
}
Другие вопросы по тегам