Поглотить сообщение Avro от Кафки через Apache Camel

У меня есть маршрут Apache Camel, публикующий сообщение AVRO на тему Apache Kafka. Я только получил это, чтобы работать при установке свойства производителя 'serializerClass=kafka.serializer.StringEncoder'. В противном случае я получаю

java.lang.ClassCastException: java.lang.String не может быть приведен к [B в kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) в kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler).scala:130) в kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125) в scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) в scala.col.mutable.WrappedArray.foreach(WrappedArray.scala:34) в kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125) в kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala. 52: at).Producer.send(Producer.scala:77) в kafka.javaapi.producer.Producer.send(Producer.scala:33) в org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:84)

С другой стороны, у меня есть второй маршрут Apache Camel, который якобы должен использовать из вышеупомянутой темы, который не работает

java.io.IOException: недопустимая длинная кодировка в org.apache.avro.io.BinaryDecoder.innerLongDecode(BinaryDecoder.java:217) в org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:176) в орг.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162) в org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) в org.apache.avro.generic.GenericD (Java:193) в org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) в org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) в org.ap.GenericDatumReader.read(GenericDatumReader.java:142) в org.apache.camel.dataformat.avro.AvroDataFormat.unmarshal(AvroDataFormat.java:133) в org.apache.camel.processor.UnmarshalProorma)

Вот код пользователя Apache Camel, который я использую:

        <route id="cassandra.publisher">
            <from
                uri="{{kafka.base.uri}}&amp;topic=sensordata&amp;groupId=Cassandra_ConsumerGroup&amp;consumerId=CassandraConsumer_Instance_1&amp;clientId=adapter2" />      
            <unmarshal>
                <custom ref="avroSensorData" />
            </unmarshal>

2 ответа

Решение

http://camel.465427.n5.nabble.com/Camel-Kafka-Component-td5749525.html

описывает, что Apache Camel версии 2.16.0/2.15.3 будет поддерживать различные типы данных, а не только сообщения String.

Как и было обещано, это было исправлено в Apache Camel 2.15.3 и исправлено в CAMEL-8790 ( https://issues.apache.org/jira/browse/CAMEL-8790).

Чтобы решить эту проблему, вы должны предоставить keyDeserializer и valueDeserializer для потребителя Camel Kafka следующим образом:

& keyDeserializer = org.apache.kafka.common.serialization.StringDeserializer & valueDeserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer

Другие вопросы по тегам