Использование kryo для пользовательской сериализации Java-объекта в Kafka

Я пытаюсь отправить объект Java с помощью сериализатора объекта Kryo. Я сталкиваюсь с несколькими проблемами. Во-первых, я не могу расширить kryo для сериализации Java-бина, который является моей полезной нагрузкой. Вот пользовательский сериализатор.


package com.test.kafka.serialization;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Closeable;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Map;

import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;

import com.esotericsoftware.kryo.Kryo;
import com.esotericsoftware.kryo.io.ByteBufferInput;
import com.esotericsoftware.kryo.io.ByteBufferOutput;
import com.esotericsoftware.kryo.io.Input;
import com.esotericsoftware.kryo.io.Output;
import com.test.kafka.TextAnalysisRequest;

public class KryoReadingSerializer implements Closeable, AutoCloseable, Serializer, Deserializer {
    private ThreadLocal kryos = new ThreadLocal() {
        protected Kryo initialValue() {
            Kryo kryo = new Kryo();
            kryo.addDefaultSerializer(TextAnalysisRequest.class, new KryoInternalSerializer());
            return kryo;
        };
    };

    @Override
    public void configure(Map map, boolean b) {
    }

    @Override
    public byte[] serialize(String s, TextAnalysisRequest ta) {
        ByteBufferOutput output = new ByteBufferOutput(100);
        kryos.get().writeObject(output, ta);
        return output.toBytes();
    }

    @Override
    public TextAnalysisRequest deserialize(String s, byte[] bytes) {
        try {
            return kryos.get().readObject(new ByteBufferInput(bytes), TextAnalysisRequest.class);
        }
        catch(Exception e) {
            throw new IllegalArgumentException("Error reading bytes",e);
        }
    }

    /* (non-Javadoc)
     * @see java.io.Closeable#close()
     */
     @Override
     public void close() {
         // TODO Auto-generated method stub

     }

     private static class KryoInternalSerializer extends com.esotericsoftware.kryo.Serializer {
         @Override
         public void write(Kryo kryo, Output output, TextAnalysisRequest ta) {
             System.out.println("Kryo write....");
             kryo.writeClassAndObject(output, ta);
         }

         @Override
         public TextAnalysisRequest read(Kryo kryo, Input input, Class aClass) {
             System.out.println("Kryo read....");
             TextAnalysisRequest ta = null;
             ta = (TextAnalysisRequest)kryo.readClassAndObject(input);
             return ta;
         }
     }
}

Часть публикации работает нормально, но во время чтения происходит сбой. При чтении выдается следующая ошибка. Это как бы идет по бесконечному циклу и продолжает выдавать это исключение.


 Error -->org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition full-3 at offset 321
org.apache.kafka.common.errors.SerializationException: Error deserializing key/value for partition full-3 at offset 321
Caused by: java.lang.IllegalArgumentException: Error reading bytes
    at com.test.kafka.serialization.KryoReadingSerializer.deserialize(KryoReadingSerializer.java:55)
    at com.test.kafka.serialization.KryoReadingSerializer.deserialize(KryoReadingSerializer.java:1)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseRecord(Fetcher.java:627)
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:548)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at com.test.kafka.consumer.ConsumerGroupSerializerObject1.run(ConsumerGroupSerializerObject1.java:68)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID: 13994
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
    at com.test.kafka.serialization.KryoReadingSerializer$KryoInternalSerializer.read(KryoReadingSerializer.java:102)
    at com.test.kafka.serialization.KryoReadingSerializer$KryoInternalSerializer.read(KryoReadingSerializer.java:1)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:657)
    at com.test.kafka.serialization.KryoReadingSerializer.deserialize(KryoReadingSerializer.java:52)
    ... 12 more

Вот мой объект Java:


public class TextAnalysisRequest implements Serializable {
    public String url;
    public String source; // corresponding to the Source2 assigned in Velocity collection
    public String content;
    public Tag[] tags;
}

public class Tag implements Serializable {
    public String name;
    public String value;
    public String weight;
    public Attribute[] attribute;
}

Вторая часть проблемы касается потребителя. Я использую Consumer Group и KafkaStream для чтения данных из очереди. Он работает для примитивного типа, но не уверен, как читать объект Java. Вот код:


public class ConsumerGroupExample {

    private final ConsumerConnector consumer;
    private final String topic;
    private  ExecutorService executor;

    public ConsumerGroupExample(String a_zookeeper, String a_groupId, String a_topic) {
        consumer = kafka.consumer.Consumer.createJavaConsumerConnector(
                createConsumerConfig(a_zookeeper, a_groupId));
        this.topic = a_topic;
    }

    public void shutdown() {
        if (consumer != null) consumer.shutdown();
        if (executor != null) executor.shutdown();
        try {
            if (!executor.awaitTermination(5000, TimeUnit.MILLISECONDS)) {
                System.out.println("Timed out waiting for consumer threads to shut down, exiting uncleanly");
            }
        } catch (InterruptedException e) {
            System.out.println("Interrupted during shutdown, exiting uncleanly");
        }
    }

    public void run(int a_numThreads) {
        Map topicCountMap = new HashMap();
        topicCountMap.put(topic, new Integer(a_numThreads));
        Map>> consumerMap = consumer.createMessageStreams(topicCountMap);
        List> streams = consumerMap.get(topic);
        executor = Executors.newFixedThreadPool(a_numThreads);
        int threadNumber = 0;
        for (final KafkaStream stream : streams) {
            executor.submit(new ConsumerGroupObject(stream, threadNumber));
            threadNumber++;
        }
    }

    private static ConsumerConfig createConsumerConfig(String a_zookeeper, String a_groupId) {
        Properties props = new Properties();
        props.put("zookeeper.connect", a_zookeeper);
        props.put("group.id", a_groupId);
        props.put("zookeeper.session.timeout.ms", "400");
        props.put("zookeeper.sync.time.ms", "200");
        props.put("auto.commit.interval.ms", "1000");
        props.put("auto.offset.reset", "smallest");

        return new ConsumerConfig(props);
    }

    public static void main(String[] args) {
        String zooKeeper = args[0];
        String groupId = args[1];
        String topic = args[2];
        int threads = Integer.parseInt(args[3]);

        ConsumerGroupExample example = new ConsumerGroupExample(zooKeeper, groupId, topic);
        example.run(threads);
        try {
            Thread.sleep(10000);
        } catch (InterruptedException ie) {

        }
    }
}

ConsumerGroupObject


import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;

public class ConsumerGroupObject implements Runnable {
    private KafkaStream m_stream;
    private int m_threadNumber;

    public ConsumerGroupObject(KafkaStream a_stream, int a_threadNumber) {
        m_threadNumber = a_threadNumber;
        m_stream = a_stream;
    }

    public void run() {
        ConsumerIterator it = m_stream.iterator();
        while (it.hasNext()){
            System.out.println("######## Thread " + m_threadNumber + ": " + new String(it.next().message()));
        }
    }
}

Здесь я пытаюсь выяснить, как извлечь объект TextAnalysisRequest и его вложенный Java-объект через ConsumerIterator. Извините за огромный пост, я новичок в kafka, поэтому хотел убедиться, что предоставить достаточно информации.

0 ответов

Другие вопросы по тегам