Spark Streaming Kafka противодавление

У нас есть приложение Spark Streaming, которое считывает данные из очереди Kafka в приемнике и выполняет некоторые преобразования и вывод в HDFS. Интервал между партиями составляет 1 мин, мы уже настроили обратное давление и spark.streaming.receiver.maxRate параметры, поэтому он работает нормально большую часть времени.

Но у нас все еще есть одна проблема. Когда HDFS полностью не работает, пакетное задание будет зависать в течение длительного времени (скажем, HDFS не работает в течение 4 часов, а задание будет зависать в течение 4 часов), но получатель не знает, что задание не завершено Таким образом, он все еще получает данные в течение следующих 4 часов. Это вызывает исключение OOM, и все приложение не работает, мы потеряли много данных.

Итак, мой вопрос: возможно ли сообщить получателю, что задание не заканчивается, поэтому он получит меньше (или даже вообще не получит) данных, а когда задание завершится, он начнет получать больше данных, чтобы наверстать упущенное. В вышеупомянутом состоянии, когда HDFS не работает, приемник будет читать меньше данных из Kafka, и блок, сгенерированный в течение следующих 4 часов, действительно мал, приемник и все приложение не работают, после того, как HDFS в порядке, приемник будет читать больше данных и начать догонять.

1 ответ

Вы можете включить противодавление, установив свойство spark.streaming.backpressure.enabled=true, Это будет динамически изменять размеры вашего пакета и позволит избежать ситуаций, когда вы получаете OOM от наращивания очереди. У него есть несколько параметров:

  • spark.streaming.backpressure.pid.proportional - сигнал ответа на ошибку в последнем размере пакета (по умолчанию 1.0)
  • spark.streaming.backpressure.pid.integral - сигнал ответа на накопленную ошибку - фактически демпфер (по умолчанию 0,2)
  • spark.streaming.backpressure.pid.derived - ответ на ошибочный тренд (полезно для быстрого реагирования на изменения, по умолчанию 0.0)
  • spark.streaming.backpressure.pid.minRate - минимальная скорость, определяемая частотой вашего пакета, измените ее, чтобы уменьшить недопоставку в заданиях с высокой пропускной способностью (по умолчанию 100)

Значения по умолчанию довольно хорошие, но я смоделировал реакцию алгоритма на различные параметры здесь

Другие вопросы по тегам