Quarkus + Kafka + обработка исключений Smallrye

Как я могу обработать исключение при потоковой обработке с помощью quarkus + kafka + smallrye?

Мой код очень похож на пример императивного производителя в руководстве по quarkus (https://quarkus.io/guides/kafka)

import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;

import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;

@Path("/prices")
public class PriceResource {

    @Inject @Channel("price-create") Emitter<Double> priceEmitter;

    @POST
    @Consumes(MediaType.TEXT_PLAIN)
    public void addPrice(Double price) {
        priceEmitter.send(price);
    }
}

Я хотел что-то похожее на ванильную библиотеку Kafka, которая дает возможность обрабатывать обратный вызов каждой запрошенной записи.

ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);

producer.send(record, new Callback() {
    @Override
    public void onCompletion(RecordMetadata metadata, Exception exception) {
        logger.info(record.toString());

        if (exception != null) {
            logger.error("Producer exception", exception);
        }
    }
});

Tks

1 ответ

Есть раздел документации о признании

@Incoming("i")
@Outgoing("j")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
  public CompletionStage<Message<String>> manualAck(Message<String> input) {
    return CompletableFuture.supplyAsync(input::getPayload)
      .thenApply(Message::of)
      .thenCompose(m -> input.ack().thenApply(x -> m));
  }
Другие вопросы по тегам