Использование kryo для пользовательской сериализации Java-объекта в Kafka
Я пытаюсь отправить объект Java с помощью сериализатора объекта Kryo. Я сталкиваюсь с несколькими проблемами. Во-первых, я не могу расширить kryo для сериализации Java-бина, который является моей полезной нагрузкой. Вот пользовательский сериализатор.
package com.test.kafka.serialization;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.ByteBufferInput;
import com.esotericsoftware.kryo.io.ByteBufferOutput;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.test.kafka.TextAnalysisRequest;
public class KryoReadingSerializer implements Closeable, AutoCloseable, Serializer, Deserializer {
private ThreadLocal kryos = new ThreadLocal() {
protected Kryo initialValue() {
Kryo kryo = new Kryo();
kryo.addDefaultSerializer(TextAnalysisRequest.class, new KryoInternalSerializer());
return kryo;
};
};
@Override
public void configure(Map map, boolean b) {
}
@Override
public byte[] serialize(String s, TextAnalysisRequest ta) {
ByteBufferOutput output = new ByteBufferOutput(100);
kryos.get().writeObject(output, ta);
return output.toBytes();
}
@Override
public TextAnalysisRequest deserialize(String s, byte[] bytes) {
try {
return kryos.get().readObject(new ByteBufferInput(bytes), TextAnalysisRequest.class);
}
catch(Exception e) {
throw new IllegalArgumentException("Error reading bytes",e);
}
}
/* (non-Javadoc)
* @see java.io.Closeable#close()
*/
@Override
public void close() {
// TODO Auto-generated method stub
}
private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer {
@Override
public void write(Kryo kryo, Output output, TextAnalysisRequest ta) {
System.out.println("Kryo write....");
kryo.writeClassAndObject(output, ta);
}
@Override
public TextAnalysisRequest read(Kryo kryo, Input input, Class aClass) {
System.out.println("Kryo read....");
TextAnalysisRequest ta = null;
ta = (TextAnalysisRequest)kryo.readClassAndObject(input);
return ta;
}
}
}
Часть публикации работает нормально, но во время чтения происходит сбой. При чтении выдается следующая ошибка. Это как бы идет по бесконечному циклу и продолжает выдавать это исключение.
Error -->org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition full-3 at offset 321
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition full-3 at offset 321
Caused by: java.lang.IllegalArgumentException: Error reading bytes
at com.test.kafka.serialization.KryoReadingSerializer.deserialize(KryoReadingSerializer.java:55)
at com.test.kafka.serialization.KryoReadingSerializer.deserialize(KryoReadingSerializer.java:1)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:627)
at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:548)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at com.test.kafka.consumer.ConsumerGroupSerializerObject1.run(ConsumerGroupSerializerObject1.java:68)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
at com.test.kafka.serialization.KryoReadingSerializer$KryoInternalSerializer.read(KryoReadingSerializer.java:102)
at com.test.kafka.serialization.KryoReadingSerializer$KryoInternalSerializer.read(KryoReadingSerializer.java:1)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
at com.test.kafka.serialization.KryoReadingSerializer.deserialize(KryoReadingSerializer.java:52)
... 12 more
Вот мой объект Java:
public class TextAnalysisRequest implements Serializable {
public String url;
public String source; // corresponding to the Source2 assigned in Velocity collection
public String content;
public Tag[] tags;
}
public class Tag implements Serializable {
public String name;
public String value;
public String weight;
public Attribute[] attribute;
}
Вторая часть проблемы касается потребителя. Я использую Consumer Group и KafkaStream для чтения данных из очереди. Он работает для примитивного типа, но не уверен, как читать объект Java. Вот код:
public class ConsumerGroupExample {
private final ConsumerConnector consumer;
private final String topic;
private ExecutorService executor;
public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
createConsumerConfig(a_zookeeper, a_groupId));
this.topic = a_topic;
}
public void shutdown() {
if (consumer != null) consumer.shutdown();
if (executor != null) executor.shutdown();
try {
if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
}
} catch (InterruptedException e) {
System.out.println("Interrupted during shutdown, exiting uncleanly");
}
}
public void run(int a_numThreads) {
Map topicCountMap = new HashMap();
topicCountMap.put(topic, new Integer(a_numThreads));
Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
List> streams = consumerMap.get(topic);
executor = Executors.newFixedThreadPool(a_numThreads);
int threadNumber = 0;
for (final KafkaStream stream : streams) {
executor.submit(new ConsumerGroupObject(stream, threadNumber));
threadNumber++;
}
}
private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
Properties props = new Properties();
props.put("zookeeper.connect", a_zookeeper);
props.put("group.id", a_groupId);
props.put("zookeeper.session.timeout.ms", "400");
props.put("zookeeper.sync.time.ms", "200");
props.put("auto.commit.interval.ms", "1000");
props.put("auto.offset.reset", "smallest");
return new ConsumerConfig(props);
}
public static void main(String[] args) {
String zooKeeper = args[0];
String groupId = args[1];
String topic = args[2];
int threads = Integer.parseInt(args[3]);
ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
example.run(threads);
try {
Thread.sleep(10000);
} catch (InterruptedException ie) {
}
}
}
ConsumerGroupObject
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
public class ConsumerGroupObject implements Runnable {
private KafkaStream m_stream;
private int m_threadNumber;
public ConsumerGroupObject(KafkaStream a_stream, int a_threadNumber) {
m_threadNumber = a_threadNumber;
m_stream = a_stream;
}
public void run() {
ConsumerIterator it = m_stream.iterator();
while (it.hasNext()){
System.out.println("######## Thread " + m_threadNumber + ": " + new String(it.next().message()));
}
}
}
Здесь я пытаюсь выяснить, как извлечь объект TextAnalysisRequest и его вложенный Java-объект через ConsumerIterator. Извините за огромный пост, я новичок в kafka, поэтому хотел убедиться, что предоставить достаточно информации.