Quarkus + Kafka + обработка исключений Smallrye
Как я могу обработать исключение при потоковой обработке с помощью quarkus + kafka + smallrye?
Мой код очень похож на пример императивного производителя в руководстве по quarkus (https://quarkus.io/guides/kafka)
import io.smallrye.reactive.messaging.annotations.Channel;
import io.smallrye.reactive.messaging.annotations.Emitter;
import javax.inject.Inject;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Consumes;
import javax.ws.rs.core.MediaType;
@Path("/prices")
public class PriceResource {
@Inject @Channel("price-create") Emitter<Double> priceEmitter;
@POST
@Consumes(MediaType.TEXT_PLAIN)
public void addPrice(Double price) {
priceEmitter.send(price);
}
}
Я хотел что-то похожее на ванильную библиотеку Kafka, которая дает возможность обрабатывать обратный вызов каждой запрошенной записи.
ProducerRecord<String, String> record = new ProducerRecord<>("topic-name", key, value);
producer.send(record, new Callback() {
@Override
public void onCompletion(RecordMetadata metadata, Exception exception) {
logger.info(record.toString());
if (exception != null) {
logger.error("Producer exception", exception);
}
}
});
Tks
1 ответ
Есть раздел документации о признании
@Incoming("i")
@Outgoing("j")
@Acknowledgment(Acknowledgment.Strategy.MANUAL)
public CompletionStage<Message<String>> manualAck(Message<String> input) {
return CompletableFuture.supplyAsync(input::getPayload)
.thenApply(Message::of)
.thenCompose(m -> input.ack().thenApply(x -> m));
}