Spark Streaming не учитывает auto.offset.reset="smalllest" или group.id?
Я создаю поток Кафки в Spark 1.6:
kafkaProps.setConfig("group.id", UUID.randomUUID().toString())
kafkaProps.createConfiguration()
kafkaProps.toImmutableMap.foreach {keyVal => println(keyVal._1 + "=" + keyVal._2)}
val ssc = new StreamingContext( sc, Seconds(2) )
val stream = ssc.createKafkaStream[String, String, StringDeserializer, StringDeserializer](
kafkaProps,
List(kafkaProps.getConfig("kafka.topic"))
)
stream.print()
ssc.start()
ssc.awaitTerminationOrTimeout(30000)
ssc.stop(stopSparkContext=false, stopGracefully=true)
КафкаПропс содержит:
...
group.id=3abedbf7-2aed-436a-b4bc-0517a9c5c419
...
auto.offset.reset=smallest
...
Group.id меняет свое значение каждый раз, когда я запускаю код по желанию. Я думал, что этого было достаточно, чтобы сбрасывать смещение на ноль каждый раз, когда я запускаю свое приложение, когда оно использует Kafka, но старые элементы в теме не извлекаются.
Есть идеи?
2 ответа
Кажется, прошли годы с тех пор, как эта тема была поднята. Тем не менее, я хотел бы ответить за других людей, которые будут искать это в Google.
Ответ заключается в том, что streaming-kafka-010 игнорирует auto.offset.reset и просто устанавливает для него значение "none", если это не так.
Вы можете найти этот фрагмент кода в методе fixKafkaParams в org.apache.spark.streaming.kafka010.KafkaUtil.
logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
Дополнительное объяснение переопределения приводится в org.apache.spark.streaming.kafka010.KafkaRDD, где повторно проверяется значение auto.offset.rest.
require("none" == kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
" must be set to none for executor kafka params, else messages may not match offsetRange")
Как правило, auto.offset.reset не поддерживается в KafkaStreaming.
Я не могу найти определение метода StreamingContext#createKafkaStream
, Не могли бы вы уточнить, где вы его нашли?
Попробуйте создать поток Kafka, используя KafkaUtils
как указано в официальной документации Spark
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
Вы пробовали "самое раннее", а не "самое маленькое" в качестве значения?
https://kafka.apache.org/documentation
[последний, самый ранний, ни один] - это значения для нового потребителя
[наименьший, наибольший] значения для старого потребителя