Сохранить смещение сообщения в Kafka с помощью KafkaUtils.createDirectStream

Как сохранить смещение сообщения в Kafka, если я использую KafkaUtils.createDirectStream для чтения сообщений. Кафка теряет значение смещения каждый раз, когда приложение выходит из строя. Затем оно читает значение, предоставленное в auto.offset.reset(которое является самым последним), и не может прочитать сообщения в интервале остановки и запуска приложения.

1 ответ

Вы можете избежать этого, вручную фиксируя смещение. Задайте для enable.auto.commit значение false, а затем используйте приведенный ниже код, чтобы зафиксировать смещение в kafka после успешной операции.

  var offsetRanges = Array[OffsetRange]()

          val valueStream = stream.transform {
            rdd =>
              offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
              rdd
          }.map(_.value())
//operation
        stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)

Вы также можете прочитать этот документ, который даст вам хорошее понимание управления смещением https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/

Другие вопросы по тегам