KafkaProducer как широковещательная переменная в интеграции Kafka-Spark

Я пытаюсь прочитать из Kafka и отправить данные в другую очередь Kakfa с помощью Spark.

Мой первоначальный подход - создать объект KafkaProducer для каждой записи в разделе RDD, и он работал нормально, но с точки зрения производительности это действительно плохо.

Поэтому я попытался использовать концепцию переменных Broadcast, чтобы сделать KakfaProducer в качестве переменной широковещания и передать его исполнителям. Это закончилось Исключением в потоке "main" com.esotericsoftware.kryo.Kryo Exception: java.util.ConcurrentModificationException

Пожалуйста, объясните или измените мой код для правильного использования и повышения производительности KakfaProducer.

   import java.io.Serializable;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Properties;
    import java.util.Set;
    import java.util.TreeSet;
    import java.util.concurrent.Future;

    import org.apache.commons.configuration.ConfigurationConverter;
    import org.apache.commons.configuration.ConfigurationException;
    import org.apache.commons.configuration.PropertiesConfiguration;
    import org.apache.kafka.clients.producer.KafkaProducer;
    import org.apache.kafka.clients.producer.Producer;
    import org.apache.kafka.clients.producer.ProducerRecord;
    import org.apache.kafka.clients.producer.RecordMetadata;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.Function;
    import org.apache.spark.broadcast.Broadcast;
    import org.apache.spark.streaming.Duration;
    import org.apache.spark.streaming.api.java.JavaInputDStream;
    import org.apache.spark.streaming.api.java.JavaStreamingContext;
    import org.apache.spark.streaming.kafka.KafkaUtils;

    import kafka.common.TopicAndPartition;
    import kafka.message.MessageAndMetadata;
    import kafka.serializer.StringDecoder;

    public class MyService implements Serializable {


        private static final long serialVersionUID = 1L;
        private PropertiesConfiguration props;
        private Producer<String, String> producer = null;
        private Future<RecordMetadata> receipt = null;
        private RecordMetadata receiptInfo = null;

        public void setProperties() {

            try {
                props = new PropertiesConfiguration("/conf/empty.properties");
            } catch (ConfigurationException e) {
                // TODO Auto-generated catch block
                System.out.println("Line 51");
                e.printStackTrace();
            }

            if (!props.containsKey("producer.topic")) {
                props.setProperty("producer.topic", "mytopic");
            }

            Properties producerprops = ConfigurationConverter.getProperties(props);

            producerprops.setProperty("bootstrap.servers", props.getString("target.bootstrap.servers"));
            producerprops.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            producerprops.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); // ????

            this.producer = new KafkaProducer<String, String>(producerprops);

        }

        public void sendmessage(String Value) {

            try {
                System.out.println("Line 111");

                String key = "xyz";

                if (Value.toString() == "20") {
                    receipt = producer
                            .send(new ProducerRecord<String, String>(props.getString("producer.topic"), key, Value));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }

        }


        public static void main(String[] args) {
            String topicNames = "mysourcetopic";
            Set<String> topicSet = new TreeSet<String>();
            for (String topic : topicNames.split(",")) {
                topicSet.add(topic.trim());
            }

            Map<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>();
            for (String topic : topicNames.split(",")) {
                for (int i = 0; i < 2; i++) {

                    TopicAndPartition tp = new TopicAndPartition(topic, i);
                    topicMap.put(tp, 0l);
                }
            }

            JavaSparkContext sparkConf = new JavaSparkContext("**************", "Kafka-Spark");

            MyService ec = new MyService();
            ec.setProperties();

            final Broadcast<Producer> bCastProducer = sparkConf.broadcast(ec.producer);

            sparkConf.getConf().set("spark.local.ip", "abcddd");

            sparkConf.getConf().set("spark.eventLog.enabled", "false");
            sparkConf.getConf().set("spark.shuffle.blockTransferService", "nio");

            JavaStreamingContext jsc = new JavaStreamingContext(sparkConf, new Duration(10000));

            Map<String, String> kafkaParams = new HashMap<String, String>();
            String pollInterval = "10000";
            String zookeeper = "xyzzz";
            int partition = 1;
            kafkaParams.put("metadata.broker.list", "xyzzz");
            kafkaParams.put("group.id", "Consumer");
            kafkaParams.put("client.id", "Consumer");
            kafkaParams.put("zookeeper.connect", zookeeper);
            JavaInputDStream<String> dfs = KafkaUtils.createDirectStream(jsc, String.class, String.class,
                    StringDecoder.class, StringDecoder.class, String.class, kafkaParams, topicMap,
                    (Function<MessageAndMetadata<String, String>, String>) MessageAndMetadata::message);

            dfs.foreachRDD(rdd -> {
                if (rdd.isEmpty()) {
                    return;
                }

                rdd.foreachPartition(itr -> {
                    try {
                        // System.out.println("231");

                        while (itr.hasNext()) {
                            ec.sendmessage(itr.next()); // Produce

                        }

                    } catch (Exception e) {
                    }
                });
            });
            jsc.start();
            jsc.awaitTermination();
        }

    }

1 ответ

См. Мой ответ по существу на тот же вопрос ( Как писать в Kafka из Spark Streaming) из предыдущего обсуждения Stack Overflow.