Как получить строковые сообщения из необработанных объектов буфера Kafka Java

Есть продюсер nodejs Kafka, который отправляет содержимое файла в Kafka. Когда потребитель Kafka принимает сообщения от Kafka, это выглядит так:

{"type":"Buffer","data":[91,13 .....]

Когда я использовал m.message.value.toString('utf8') в nodejs-кафке-потребителе печатается фактическое сообщение. Но мне нужно потреблять в java Kafka потребителя. Я пробовал с property.put("value.serializer.encoding", "utf8") а также new String(consumerRecord.value()) но все еще печатать {"type":"Buffer","data":[91,13 .....], Мой вопрос заключается в том, как использовать сообщение в строковом формате в Java, которое производится производителем Kjjka nodejs.

1 ответ

Вы должны использовать правильный сериализатор ключа и значения в вызове API производителя и потребителя.

Kafka предоставляет некоторые ключи по умолчанию и значения serilizer, такие как StringSerializer и т. Д.

Строковый сериализатор для создания и потребления строковых сообщений.

props.put ("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put ("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Например, вы можете сослаться на документ производителя и потребителя.

https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html https://kafka.apache.org/10/javadoc/?org/apache/kafka/clients/consumer/KafkaConsumer.html

Реестр схемы Поскольку вы отправляете сообщение из nodejs и используете его с помощью потребителя kafka, вы должны предоставить тот же ключ и значение средства для удаления сообщений в приемнике java kafka.

Существует один вариант решения этой несовместимой проблемы - использовать schema-registry для хранения схемы нашего сообщения с использованием kafka avro. Затем мы можем использовать эту схему в nodejs-kafka-производителе и java-kafka-потребителе для получения сообщения.

Производитель Nodejs kafka с примером avro

Вот пример nodejs kafka avro

Потребитель Java Kafka, использующий пример реестра схемы

Другие вопросы по тегам