Сохранить смещение сообщения в Kafka с помощью KafkaUtils.createDirectStream
Как сохранить смещение сообщения в Kafka, если я использую KafkaUtils.createDirectStream для чтения сообщений. Кафка теряет значение смещения каждый раз, когда приложение выходит из строя. Затем оно читает значение, предоставленное в auto.offset.reset(которое является самым последним), и не может прочитать сообщения в интервале остановки и запуска приложения.
1 ответ
Вы можете избежать этого, вручную фиксируя смещение. Задайте для enable.auto.commit значение false, а затем используйте приведенный ниже код, чтобы зафиксировать смещение в kafka после успешной операции.
var offsetRanges = Array[OffsetRange]()
val valueStream = stream.transform {
rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map(_.value())
//operation
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
Вы также можете прочитать этот документ, который даст вам хорошее понимание управления смещением https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/