Spring Boot 3 с Kafka подтверждает, что обратный вызов не работает

Я тестирую Spring Boot 3 с помощью Spring-Kafka. Я хотел бы получать уведомление (аналогично обратному вызову), когда отправленное сообщение было обработано (когда подтверждение выполняется в потребителе). В настоящее время сообщение «Сообщение отправлено успешно» печатается без ожидания подтверждения сообщения. Вот код, который я использую:

Контроллер сообщений:

      @RestController
@RequestMapping("/messages")
@RequiredArgsConstructor
@Slf4j
public class MessageController {

  private final KafkaTemplate<Object, String> kafkaTemplate;

  @GetMapping
  public void sendMessage() {
    CompletableFuture<SendResult<Object, String>> future = kafkaTemplate.send("message-topic", "message");
    future.whenComplete((result, ex) -> {
      if (ex == null) {
        log.info("Message sent successfully {}", result);
      } else {
        log.error("Error sending message", ex);
      }
    });
  }

  @KafkaListener(id = "message-topic-listener", topics = "message-topic")
  public void messageListener(@Payload String message, Acknowledgment acknowledgment) throws InterruptedException {
    Thread.sleep(10000L);
    log.info("Processing message {}", message);
    acknowledgment.acknowledge();
  }
}

application.yml

      spring:
  kafka:
    producer:
      bootstrap-servers: localhost:29092
    listener:
      ack-mode: MANUAL
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      bootstrap-servers: localhost:29092

1 ответ

Поверьте, я нашел ответ на свой вопрос выше: метод WhenComplete класса SendResult выполняется, когда сообщение успешно записывается/реплицируется в раздел, а не при событии фиксации потребителя.

Другие вопросы по тегам