Отправка ошибок в тему повторных попыток Kafka с помощью Spring Cloud Stream
Я использую Spring Cloud Stream вместе с Spring Cloud Function и механизмом связывания Kafka. Моя цель проста: я хочу прочитать из входной темы, выполнить некоторую обработку и опубликовать в выходной теме. Если что-то пойдет не так, я хочу вместо этого опубликовать сообщение в теме ошибок. Из прочтения документации выясняется, что поддержка Dead Letter Queue предоставит мне тему с ошибками, которая мне нужна. Мой код ниже:
@SpringBootApplication
@EnableBinding(Processor.class)
public class Application {
public static void main(final String[] args) {
SpringApplication.run(Application.class, args);
}
}
@Configuration
public class AppConfig {
@Bean
public ServiceImpl service() {
return new ServiceImpl();
}
@Bean
public Function<String, String> execute(ServiceImpl service) {
return service::doBusinessLogic;
}
}
public class ServiceImpl {
public String doBusinessLogic(String message) {
System.out.println("Handling message");
if (message.startsWith("foo")) {
throw new RuntimeException("This is a test");
}
return message;
}
}
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9091
autoCreateTopics: false
bindings:
input:
consumer:
enableDlq: true
dlqName: myErrorTopic
bindings:
input:
destination: myInputTopic
group: myConsumerGroup
output:
destination: myOutputTopic
function:
definition: execute
Когда я запускаю это приложение и отправляю сообщение в тему ввода, служба правильно получает сообщение и выдается исключение RuntimeException. Я вижу в журналах, что сообщение повторяется (конфигурация по умолчанию с RetryTemplate), но как только повторные попытки исчерпаны, сообщение не публикуется в теме ошибок. Что я здесь не так делаю?