Поглотить сообщение Avro от Кафки через Apache Camel
У меня есть маршрут Apache Camel, публикующий сообщение AVRO на тему Apache Kafka. Я только получил это, чтобы работать при установке свойства производителя 'serializerClass=kafka.serializer.StringEncoder'. В противном случае я получаю
java.lang.ClassCastException: java.lang.String не может быть приведен к [B в kafka.serializer.DefaultEncoder.toBytes(Encoder.scala:34) в kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler).scala:130) в kafka.producer.async.DefaultEventHandler$$anonfun$serialize$1.apply(DefaultEventHandler.scala:125) в scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) в scala.col.mutable.WrappedArray.foreach(WrappedArray.scala:34) в kafka.producer.async.DefaultEventHandler.serialize(DefaultEventHandler.scala:125) в kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala. 52: at).Producer.send(Producer.scala:77) в kafka.javaapi.producer.Producer.send(Producer.scala:33) в org.apache.camel.component.kafka.KafkaProducer.process(KafkaProducer.java:84)
С другой стороны, у меня есть второй маршрут Apache Camel, который якобы должен использовать из вышеупомянутой темы, который не работает
java.io.IOException: недопустимая длинная кодировка в org.apache.avro.io.BinaryDecoder.innerLongDecode(BinaryDecoder.java:217) в org.apache.avro.io.BinaryDecoder.readLong(BinaryDecoder.java:176) в орг.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162) в org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:160) в org.apache.avro.generic.GenericD (Java:193) в org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:183) в org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151) в org.ap.GenericDatumReader.read(GenericDatumReader.java:142) в org.apache.camel.dataformat.avro.AvroDataFormat.unmarshal(AvroDataFormat.java:133) в org.apache.camel.processor.UnmarshalProorma)
Вот код пользователя Apache Camel, который я использую:
<route id="cassandra.publisher">
<from
uri="{{kafka.base.uri}}&topic=sensordata&groupId=Cassandra_ConsumerGroup&consumerId=CassandraConsumer_Instance_1&clientId=adapter2" />
<unmarshal>
<custom ref="avroSensorData" />
</unmarshal>
2 ответа
http://camel.465427.n5.nabble.com/Camel-Kafka-Component-td5749525.html
описывает, что Apache Camel версии 2.16.0/2.15.3 будет поддерживать различные типы данных, а не только сообщения String.
Как и было обещано, это было исправлено в Apache Camel 2.15.3 и исправлено в CAMEL-8790 ( https://issues.apache.org/jira/browse/CAMEL-8790).
Чтобы решить эту проблему, вы должны предоставить keyDeserializer и valueDeserializer для потребителя Camel Kafka следующим образом:
& keyDeserializer = org.apache.kafka.common.serialization.StringDeserializer & valueDeserializer = org.apache.kafka.common.serialization.ByteArrayDeserializer