Spark Streaming не учитывает auto.offset.reset="smalllest" или group.id?

Я создаю поток Кафки в Spark 1.6:

kafkaProps.setConfig("group.id", UUID.randomUUID().toString())

kafkaProps.createConfiguration()
kafkaProps.toImmutableMap.foreach {keyVal => println(keyVal._1 + "=" + keyVal._2)}

val ssc = new StreamingContext( sc, Seconds(2) ) 
val stream = ssc.createKafkaStream[String, String,  StringDeserializer, StringDeserializer](
                     kafkaProps,
                     List(kafkaProps.getConfig("kafka.topic"))
                     )
stream.print()

ssc.start()
ssc.awaitTerminationOrTimeout(30000)
ssc.stop(stopSparkContext=false, stopGracefully=true)

КафкаПропс содержит:

...
group.id=3abedbf7-2aed-436a-b4bc-0517a9c5c419
...
auto.offset.reset=smallest
...

Group.id меняет свое значение каждый раз, когда я запускаю код по желанию. Я думал, что этого было достаточно, чтобы сбрасывать смещение на ноль каждый раз, когда я запускаю свое приложение, когда оно использует Kafka, но старые элементы в теме не извлекаются.

Есть идеи?

2 ответа

Кажется, прошли годы с тех пор, как эта тема была поднята. Тем не менее, я хотел бы ответить за других людей, которые будут искать это в Google.

Ответ заключается в том, что streaming-kafka-010 игнорирует auto.offset.reset и просто устанавливает для него значение "none", если это не так.

Вы можете найти этот фрагмент кода в методе fixKafkaParams в org.apache.spark.streaming.kafka010.KafkaUtil.

logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")

Дополнительное объяснение переопределения приводится в org.apache.spark.streaming.kafka010.KafkaRDD, где повторно проверяется значение auto.offset.rest.

require("none" ==    kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).asInstanceOf[String],
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG +
  " must be set to none for executor kafka params, else messages may not match offsetRange")

Как правило, auto.offset.reset не поддерживается в KafkaStreaming.

Я не могу найти определение метода StreamingContext#createKafkaStream, Не могли бы вы уточнить, где вы его нашли?

Попробуйте создать поток Kafka, используя KafkaUtils как указано в официальной документации Spark

val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

Вы пробовали "самое раннее", а не "самое маленькое" в качестве значения?

https://kafka.apache.org/documentation

[последний, самый ранний, ни один] - это значения для нового потребителя

[наименьший, наибольший] значения для старого потребителя

Другие вопросы по тегам