Не удалось найти лидера для Set([topic,0]) с интеграцией Kafka-Spark

Я пытаюсь использовать SSL для интеграции Kafka-Spark. Я протестировал Kafka с включенным SSL, и он отлично работает с образцами потребителей и производителей.

Кроме того, я попробовал интеграцию Spark-Kafka, которая также работает без проблем, когда выполняется без SSL в spark-job.

Теперь, когда я включаю SSL в spark-job, я получаю исключение, и интеграция не работает.

Единственное изменение, которое я сделал, чтобы включить SSL в spark-job, - включить в свою работу следующие строки кода:

    sparkConf.set("security.protocol", "SSL");
    sparkConf.set("ssl.truststore.location", "PATH/truststore.jks");
    sparkConf.set("ssl.truststore.password", "passwrd");
    sparkConf.set("ssl.keystore.location", "PATH/keystore.jks");
    sparkConf.set("ssl.keystore.password", "kstore");
    sparkConf.set("ssl.key.password", "keypass");

И этот sparkConf передается при создании потокового контекста.

JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(10000));

И когда я запускаю работу, я получаю следующую ошибку:

17/05/24 18:16:39 WARN ConsumerFetcherManager$LeaderFinderThread: [test-consumer-group_bmj-cluster-1495664195784-5f49cbd0-leader-finder-thread], Failed to find leader for Set([bell,0])
java.lang.NullPointerException
    at org.apache.kafka.common.utils.Utils.formatAddress(Utils.java:312)
    at kafka.cluster.Broker.connectionString(Broker.scala:62)
    at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
    at kafka.client.ClientUtils$$anonfun$fetchTopicMetadata$5.apply(ClientUtils.scala:89)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:89)
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)

Версия Kafka - 2.11-0.10.2.0
Версия Spark - 2.1.0
Версия Scala - 2.11.8

Потоковые библиотеки

  <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka_2.10 -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>2.1.0</version>
    </dependency>

Любая помощь в отношении преодоления этой проблемы?

1 ответ

Решение

Немного покопавшись, я смог выяснить проблему, с которой столкнулся.

Прежде всего, чтобы включить SSL, связанный с SSL, необходимо передать kafka-params в метод KafkaUtils.createDirectStream(), а НЕ вsparkConfJavaStreamingContext.

Тогда заданыпараметры SSL:

"security.protocol", "SSL"
"ssl.truststore.location", "PATH/truststore.jks"
"ssl.truststore.password", "passwrd"
"ssl.keystore.location", "PATH/keystore.jks"
"ssl.keystore.password", "kstore"
"ssl.key.password", "keypass"

не поддерживаются версиейspark-kafka-streaming"0-8 _2.11", которую я использовал, поэтому мне пришлось изменить это на версию "0-10 _2.11".

Это, в свою очередь, имеет полное изменение API метода: KafkaUtils.createDirectStream(), который используется для подключения к Kafka.

Объяснение дано в документации относительно того, как использовать это здесь.

Итак, мой последний фрагмент кода для подключения к Kafka выглядит так:

final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    javaStreamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topicsCollection, kafkaParams)
            );

с kafka-params, являющимся картой, которая содержит все параметры SSL.

Спасибо
Shabir

Другие вопросы по тегам