Как ввести KafkaTemplate в Quarkus
Я пытаюсь ввести KafkaTemplate
отправить одно сообщение. Я разрабатываю небольшую функцию, которая лежит за пределами реактивного подхода.
Я могу только найти примеры, которые используют @Ingoing
а также @Outgoing
из Смоллри, но мне не нужен KafkaStream
,
Я пытался с Kafka-CDI, но я не могу ввести SimpleKafkaProducer
,
Есть идеи?
Для ответа Климента
Кажется, правильное направление, но выполнение orders.send("hello");
Я получаю эту ошибку:
(vert.x-eventloop-thread-3) Unhandled exception:java.lang.IllegalStateException: Stream not yet connected
Из моей темы я использую командную строку, Кафка запущена и работает, если я создаю вручную, я могу видеть использованные сообщения.
Похоже, относительно этого предложения документа:
Чтобы использовать Emitter для потока hello, вам нужен @Incoming("hello") где-нибудь в вашем коде (или в вашей конфигурации).
У меня есть этот код в моем классе:
@Incoming("orders")
public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
return msg.ack();
}
Может быть, я забыл некоторые конфигурации?
1 ответ
Итак, вам просто нужно использовать Emitter
:
@Inject
@Stream("orders") // Emit on the channel 'orders'
Emitter<String> orders;
// ...
orders.send("hello");
И в твоем application.properties
, объявить:
## Orders topic (WRITE)
mp.messaging.outgoing.orders.type=io.smallrye.reactive.messaging.kafka.Kafka
mp.messaging.outgoing.orders.topic=orders
mp.messaging.outgoing.orders.bootstrap.servers=localhost:9092
mp.messaging.outgoing.orders.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.acks=1
Избежать Stream not yet connected
Исключение, как предполагает doc:
Чтобы использовать Emitter для потока hello, вам нужен @Incoming("hello") где-нибудь в вашем коде (или в вашей конфигурации).
Предполагая, что у вас есть что-то вроде этого в вашем application.properties:
# Orders topic (READ)
smallrye.messaging.source.orders-r-topic.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.orders-r-topic.topic=orders
smallrye.messaging.source.orders-r-topic.bootstrap.servers=0.0.0.0:9092
smallrye.messaging.source.orders-r-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.group.id=my-group-id
Добавьте что-то вроде этого:
@Incoming("orders-r-topic")
public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
return msg.ack();
}
Так как ответ Климент КНИГА
@Stream
аннотация устарела. В@Channel
вместо этого следует использовать аннотацию.
Вы можете использовать Emitter
предоставленный quarkus-smallrye-reactive-messaging-kafka
зависимость для создания сообщения в теме Kafka.
Простая реализация производителя Kafka:
public class MyKafkaProducer {
@Inject
@Channel("my-topic")
Emitter<String> myEmitter;
public void produce(String message) {
myEmitter.send(message);
}
}
И в файл application.properties необходимо добавить следующую конфигурацию:
mp.messaging.outgoing.my-topic.connector=smallrye-kafka
mp.messaging.outgoing.my-topic.bootstrap.servers=localhost:9092
mp.messaging.outgoing.my-topic.value.serializer=org.apache.kafka.common.serialization.StringSerializer
Это создаст строковые сериализованные сообщения для темы kafka с именем my-topic
.
Обратите внимание, что по умолчанию имя канала также является названием темы kafka, в которой будут создаваться данные. Это поведение можно изменить в конфигурации. Поддерживаются следующие атрибуты конфигурации описаны в реактивном сообщениями документации