Как получить смещения Кафки для структурированного запроса для ручного и надежного управления смещениями?

Spark 2.2 представил структурированный потоковый источник Kafka. Как я понимаю, он использует каталог контрольных точек HDFS для хранения смещений и гарантии доставки сообщений "точно один раз".

Но старые доки (такие как https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/) говорят, что контрольные точки Spark Streaming не восстанавливаются между приложениями или обновления Spark и, следовательно, не очень надежные. В качестве решения существует практика поддержки хранения смещений во внешнем хранилище, которое поддерживает транзакции, такие как MySQL или RedshiftDB.

Если я хочу сохранить смещения из источника Kafka в транзакционную БД, как я могу получить смещение из пакета структурированного потока?

Раньше это можно было сделать, приведя RDD к HasOffsetRanges:

val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges    

Но с новым потоковым API у меня есть Dataset из InternalRow и я не могу найти простой способ получить смещения. Sink API имеет только addBatch(batchId: Long, data: DataFrame) метод и как я могу предположить, чтобы получить смещение для данного идентификатора партии?

3 ответа

Решение

Соответствующая ветка обсуждения списка рассылки Spark DEV находится здесь.

Краткое изложение:

Spark Streaming будет поддерживать получение смещений в будущих версиях (> 2.2.0). Билет JIRA, чтобы следовать - https://issues-test.apache.org/jira/browse/SPARK-18258

Для Spark <= 2.2.0 вы можете получить смещения для данного пакета, прочитав json из каталога контрольных точек (API не стабилен, поэтому будьте осторожны):

val checkpointRoot = // read 'checkpointLocation' from custom sink params
val checkpointDir = new Path(new Path(checkpointRoot), "offsets").toUri.toString
val offsetSeqLog = new OffsetSeqLog(sparkSession, checkpointDir)

val endOffset: Map[TopicPartition, Long] = offsetSeqLog.get(batchId).map { endOffset =>
  endOffset.offsets.filter(_.isDefined).map { str =>
    JsonUtilsWrapper.jsonToOffsets(str.get.json)
  }
}


/**
  * Hack to access private API
  * Put this class into org.apache.spark.sql.kafka010 package
  */
object JsonUtilsWrapper {
  def offsetsToJson(partitionOffsets: Map[TopicPartition, Long]): String = {
    JsonUtils.partitionOffsets(partitionOffsets)
  }

  def jsonToOffsets(str: String): Map[TopicPartition, Long] = {
    JsonUtils.partitionOffsets(str)
  }
}

это endOffset будет содержать смещение до каждой темы / раздела. Получение начальных смещений проблематично, потому что вы должны прочитать каталог контрольной точки 'commit'. Но, как правило, вам не нужны начальные смещения, поскольку достаточно сохранить конечные смещения для надежного перезапуска задания Spark.

Обратите внимание, что вы также должны хранить обработанный идентификатор партии в своем хранилище. В некоторых случаях Spark может повторно запустить неудачный пакет с тем же идентификатором пакета, поэтому обязательно инициализируйте пользовательский приемник с идентификатором последней обработанной партии (который вы должны прочитать из внешнего хранилища) и проигнорируйте любой пакет с идентификатором

Spark 2.2 представил структурированный потоковый источник Kafka. Как я понимаю, для хранения смещений и гарантии доставки сообщений "точно в один момент" используется директория контрольной точки HDFS.

Правильный.

Каждый триггер Spark Structured Streaming будет сохранять смещения в offset каталог в местоположении контрольной точки (определяется с помощью checkpointLocation вариант или spark.sql.streaming.checkpointLocation Свойство Spark или назначенное случайным образом), которое должно гарантировать, что смещения обрабатываются не более одного раза. Функция называется Write Ahead Logs.

Другой каталог в местоположении контрольной точки commits каталог для завершенных потоковых пакетов с одним файлом на пакет (с именем файла, являющимся идентификатором пакета).

Цитирование официальной документации в семантике отказоустойчивости:

Чтобы достичь этого, мы разработали источники структурированной потоковой передачи, приемники и механизм выполнения, чтобы надежно отслеживать точный ход обработки, чтобы он мог обрабатывать любые виды сбоев путем перезапуска и / или повторной обработки. Предполагается, что каждый потоковый источник имеет смещения (аналогичные смещениям Кафки или порядковым номерам Kinesis) для отслеживания позиции чтения в потоке. Движок использует контрольные точки и журналы записи вперед для записи диапазона смещения данных, обрабатываемых в каждом триггере. Потоковые приемники спроектированы так, чтобы быть идепотентными для обработки переработки. Совместно, используя воспроизводимые источники и идемпотентные приемники, структурированная потоковая передача может обеспечить сквозную семантику "точно один раз" при любом сбое.

Каждый раз, когда запускается триггер StreamExecution проверяет каталоги и "вычисляет", какие смещения уже были обработаны. Это дает вам хотя бы один раз семантику и ровно один раз всего.

Но в старых документах (...) говорится, что контрольные точки Spark Streaming не восстанавливаются между приложениями или обновлениями Spark и, следовательно, не очень надежны.

Была причина, почему вы назвали их "старыми", не так ли?

Они ссылаются на старую и (на мой взгляд) мертвую потоковую передачу Spark, которая сохраняла не только смещения, но и весь код запроса, что приводило к ситуациям, когда контрольные точки были практически непригодны, например, когда вы изменяли код.

Времена истекли, и структурированная потоковая передача более осторожна в отношении того, что и когда нужно проверять.

Если я хочу сохранить смещения из источника Kafka в транзакционную БД, как я могу получить смещение из пакета структурированного потока?

Решением может быть реализация или использование интерфейса MetadataLog, который используется для обработки контрольных точек смещения. Это может сработать.

Как я могу предположить, чтобы получить смещение для данного идентификатора партии?

В настоящее время это невозможно.

Насколько я понимаю, вы не сможете сделать это, так как семантика потоковой передачи скрыта от вас. Вам просто не следует иметь дело с этой низкоуровневой "штукой", называемой смещениями, которую использует Spark Structured Streaming, чтобы предложить ровно один раз гарантии.

Цитирую Майкла Армбраста из его выступления на Spark Summit Простая, масштабируемая, отказоустойчивая обработка потока со структурированным потоком в Apache Spark:

Вы не должны рассуждать о потоковой передаче

и далее в разговоре (на следующем слайде):

Вы должны написать простые запросы и Spark должен постоянно обновлять ответ


Есть способ получить смещения (из любого источника, включая Кафку), используя StreamingQueryProgressчто вы можете перехватить с помощью StreamingQueryListener и onQueryProgressПерезвоните.

onQueryProgress (событие: QueryProgressEvent): модуль вызывается при обновлении состояния (обновленная скоростьзагрузки и т. д.)

С StreamingQueryProgress вы можете получить доступ sources свойство с SourceProgress, которое дает вам то, что вы хотите.

Потоковый набор данных с источником Кафки имеет offset как один из области. Вы можете просто запросить все смещения в запросе и сохранить их в JDBC Sink

Другие вопросы по тегам