Как перематывать смещения Kafka в потоковом чтении с искровой структурой

У меня есть задание Spark Structured Streaming, которое настроено на чтение данных из Kafka. Пожалуйста, пройдите код, чтобы проверить readStream() с параметрами для чтения последних данных из Кафки.

Я это понимаю readStream() читает с первого смещения при запуске нового запроса, а не при возобновлении.

Но я не знаю, как начать новый запрос каждый раз, когда перезапускаю свою работу в IntelliJ.

val kafkaStreamingDF = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", AppProperties.getProp(AppConstants.PROPS_SERVICES_KAFKA_SERVERS))
  .option("subscribe", AppProperties.getProp(AppConstants.PROPS_SDV_KAFKA_TOPICS))
  .option("failOnDataLoss", "false")
  .option("startingOffsets","earliest")
  .load()
  .selectExpr("CAST(value as STRING)", "CAST(topic as STRING)")

Я также попытался установить смещения с помощью """{"topicA":{"0":0,"1":0}}"""

Ниже мой писательский поток

val query = kafkaStreamingDF
  .writeStream
  .format("console")
  .start()

Каждый раз, когда я перезагружаю свою работу в IntelliJ IDE, журналы показывают, что смещение было установлено на самое позднее, а не на 0 или самое раннее.

Есть ли способ, которым я могу очистить свою контрольную точку, в этом случае я не знаю, где находится каталог контрольных точек, потому что в вышеупомянутом случае я не указываю какую-либо контрольную точку.

2 ответа

Кафка полагается на собственность auto.offset.reset позаботиться об офсетном управлении.

По умолчанию используется "последний", что означает, что при отсутствии действительного смещения потребитель начнет чтение из самых новых записей (записей, которые были записаны после того, как потребитель начал работать). Альтернатива "самая ранняя", что означает, что при отсутствии действительного смещения потребитель будет считывать все данные в разделе, начиная с самого начала.

По вашему вопросу вы хотите прочитать все данные из темы. Так что постановкаstartingOffsets"до"earliest"должно работать. Но также убедитесь, что вы устанавливаете enable.auto.commit ложно.

Установив enable.auto.commit в true означает, что смещения фиксируются автоматически с частотой, контролируемой конфигурацией auto.commit.interval.ms,

Если для этого параметра установлено значение true, то смещения автоматически отправляются в Kafka, когда сообщения считываются из Kafka, что не обязательно означает, что Spark завершил обработку этих сообщений. Чтобы включить точное управление для фиксации смещений, установите параметр Kafka enable.auto.commit в false,

Попробуйте настроить .option("kafka.client.id", "XX"), чтобы использовать другой client.id,

Другие вопросы по тегам