Spring Boot 3 с Kafka подтверждает, что обратный вызов не работает
Я тестирую Spring Boot 3 с помощью Spring-Kafka. Я хотел бы получать уведомление (аналогично обратному вызову), когда отправленное сообщение было обработано (когда подтверждение выполняется в потребителе). В настоящее время сообщение «Сообщение отправлено успешно» печатается без ожидания подтверждения сообщения. Вот код, который я использую:
Контроллер сообщений:
@RestController
@RequestMapping("/messages")
@RequiredArgsConstructor
@Slf4j
public class MessageController {
private final KafkaTemplate<Object, String> kafkaTemplate;
@GetMapping
public void sendMessage() {
CompletableFuture<SendResult<Object, String>> future = kafkaTemplate.send("message-topic", "message");
future.whenComplete((result, ex) -> {
if (ex == null) {
log.info("Message sent successfully {}", result);
} else {
log.error("Error sending message", ex);
}
});
}
@KafkaListener(id = "message-topic-listener", topics = "message-topic")
public void messageListener(@Payload String message, Acknowledgment acknowledgment) throws InterruptedException {
Thread.sleep(10000L);
log.info("Processing message {}", message);
acknowledgment.acknowledge();
}
}
application.yml
spring:
kafka:
producer:
bootstrap-servers: localhost:29092
listener:
ack-mode: MANUAL
consumer:
enable-auto-commit: false
auto-offset-reset: earliest
bootstrap-servers: localhost:29092
1 ответ
Поверьте, я нашел ответ на свой вопрос выше: метод WhenComplete класса SendResult выполняется, когда сообщение успешно записывается/реплицируется в раздел, а не при событии фиксации потребителя.