Как ввести KafkaTemplate в Quarkus

Я пытаюсь ввести KafkaTemplate отправить одно сообщение. Я разрабатываю небольшую функцию, которая лежит за пределами реактивного подхода.

Я могу только найти примеры, которые используют @Ingoing а также @Outgoing из Смоллри, но мне не нужен KafkaStream,

Я пытался с Kafka-CDI, но я не могу ввести SimpleKafkaProducer,

Есть идеи?

Для ответа Климента

Кажется, правильное направление, но выполнение orders.send("hello"); Я получаю эту ошибку:

(vert.x-eventloop-thread-3) Unhandled exception:java.lang.IllegalStateException: Stream not yet connected

Из моей темы я использую командную строку, Кафка запущена и работает, если я создаю вручную, я могу видеть использованные сообщения.

Похоже, относительно этого предложения документа:

Чтобы использовать Emitter для потока hello, вам нужен @Incoming("hello") где-нибудь в вашем коде (или в вашей конфигурации).

У меня есть этот код в моем классе:

    @Incoming("orders")
    public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
        log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
        return msg.ack();
    }

Может быть, я забыл некоторые конфигурации?

1 ответ

Решение

Итак, вам просто нужно использовать Emitter:

@Inject
@Stream("orders") // Emit on the channel 'orders'
Emitter<String> orders;

// ...
orders.send("hello");

И в твоем application.properties, объявить:

## Orders topic (WRITE)
mp.messaging.outgoing.orders.type=io.smallrye.reactive.messaging.kafka.Kafka
mp.messaging.outgoing.orders.topic=orders
mp.messaging.outgoing.orders.bootstrap.servers=localhost:9092
mp.messaging.outgoing.orders.key.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.value.serializer=org.apache.kafka.common.serialization.StringSerializer
mp.messaging.outgoing.orders.acks=1

Избежать Stream not yet connected Исключение, как предполагает doc:

Чтобы использовать Emitter для потока hello, вам нужен @Incoming("hello") где-нибудь в вашем коде (или в вашей конфигурации).

Предполагая, что у вас есть что-то вроде этого в вашем application.properties:

# Orders topic (READ)
smallrye.messaging.source.orders-r-topic.type=io.smallrye.reactive.messaging.kafka.Kafka
smallrye.messaging.source.orders-r-topic.topic=orders
smallrye.messaging.source.orders-r-topic.bootstrap.servers=0.0.0.0:9092
smallrye.messaging.source.orders-r-topic.key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
smallrye.messaging.source.orders-r-topic.group.id=my-group-id

Добавьте что-то вроде этого:

@Incoming("orders-r-topic")
public CompletionStage<Void> consume(KafkaMessage<String, String> msg) {
    log.info("Received message (topic: {}, partition: {}) with key {}: {}", msg.getTopic(), msg.getPartition(), msg.getKey(), msg.getPayload());
    return msg.ack();
}

Так как ответ Климент КНИГА @Stream аннотация устарела. В @Channel вместо этого следует использовать аннотацию.

Вы можете использовать Emitter предоставленный quarkus-smallrye-reactive-messaging-kafka зависимость для создания сообщения в теме Kafka.

Простая реализация производителя Kafka:

public class MyKafkaProducer {

    @Inject
    @Channel("my-topic")
    Emitter<String> myEmitter;

    public void produce(String message) {
      myEmitter.send(message);
    }
}

И в файл application.properties необходимо добавить следующую конфигурацию:

mp.messaging.outgoing.my-topic.connector=smallrye-kafka
mp.messaging.outgoing.my-topic.bootstrap.servers=localhost:9092
mp.messaging.outgoing.my-topic.value.serializer=org.apache.kafka.common.serialization.StringSerializer

Это создаст строковые сериализованные сообщения для темы kafka с именем my-topic.

Обратите внимание, что по умолчанию имя канала также является названием темы kafka, в которой будут создаваться данные. Это поведение можно изменить в конфигурации. Поддерживаются следующие атрибуты конфигурации описаны в реактивном сообщениями документации

Другие вопросы по тегам