Spring Kafka Retry Logging
У меня есть требование использовать тему из kafka, выполнить некоторую работу с записями и создать другую тему с помощью spring-kafka 2.1.7. Другие требования требуют транзакций только для семантики, повторных попыток и обработки ошибок. При неудачной фиксации записи я Если необходимо выполнить 3 попытки, зарегистрируйте каждое из сообщений о повторных попытках, чтобы повторить тему, а в случае неудачи всех попыток отправьте запись в пустую тему. Я посмотрел на https://github.com/spring-projects/spring-kafka/issues/575 и там есть отличные детали по решению проблемы. С чем я борюсь, так это как регистрировать каждое сообщение о повторных попытках с такими деталями, как смещение потребителя, тема, которую он пытался зафиксировать, и т. Д. Есть ли способ получить их от обратного вызова? Приведенный ниже фрагмент retrylistener зарегистрирован в org.springframework.kafka.listener.LoggingErrorHandler, который в качестве свойства контейнера установлен для ConcurrentKafkaListenerContainerFactory?
@Bean
public RetryListener retryListener(KafkaTemplate<String,SpecificRecord> kafkaTemplate) {
return new RetryListenerSupport() {
public void onError(RetryContext context, RetryCallback callback, Throwable throwable) {
int retryCount =context.getRetryCount();
kafkaTemplate) .send(new ProducerRecord<String,SpecificRecord>("topic_name",record));
}
};
}
1 ответ
RetryContext
заполнен некоторой полезной информацией в RetryingMessageListenerAdapter
:
context.setAttribute(CONTEXT_RECORD, record);
switch (RetryingMessageListenerAdapter.this.delegateType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
context.setAttribute(CONTEXT_CONSUMER, consumer);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment, consumer);
break;
case ACKNOWLEDGING:
context.setAttribute(CONTEXT_ACKNOWLEDGMENT, acknowledgment);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, acknowledgment);
break;
case CONSUMER_AWARE:
context.setAttribute(CONTEXT_CONSUMER, consumer);
RetryingMessageListenerAdapter.this.delegate.onMessage(record, consumer);
break;
case SIMPLE:
RetryingMessageListenerAdapter.this.delegate.onMessage(record);
}