Spark Streaming Kafka противодавление
У нас есть приложение Spark Streaming, которое считывает данные из очереди Kafka в приемнике и выполняет некоторые преобразования и вывод в HDFS. Интервал между партиями составляет 1 мин, мы уже настроили обратное давление и spark.streaming.receiver.maxRate
параметры, поэтому он работает нормально большую часть времени.
Но у нас все еще есть одна проблема. Когда HDFS полностью не работает, пакетное задание будет зависать в течение длительного времени (скажем, HDFS не работает в течение 4 часов, а задание будет зависать в течение 4 часов), но получатель не знает, что задание не завершено Таким образом, он все еще получает данные в течение следующих 4 часов. Это вызывает исключение OOM, и все приложение не работает, мы потеряли много данных.
Итак, мой вопрос: возможно ли сообщить получателю, что задание не заканчивается, поэтому он получит меньше (или даже вообще не получит) данных, а когда задание завершится, он начнет получать больше данных, чтобы наверстать упущенное. В вышеупомянутом состоянии, когда HDFS не работает, приемник будет читать меньше данных из Kafka, и блок, сгенерированный в течение следующих 4 часов, действительно мал, приемник и все приложение не работают, после того, как HDFS в порядке, приемник будет читать больше данных и начать догонять.
1 ответ
Вы можете включить противодавление, установив свойство spark.streaming.backpressure.enabled=true
, Это будет динамически изменять размеры вашего пакета и позволит избежать ситуаций, когда вы получаете OOM от наращивания очереди. У него есть несколько параметров:
- spark.streaming.backpressure.pid.proportional - сигнал ответа на ошибку в последнем размере пакета (по умолчанию 1.0)
- spark.streaming.backpressure.pid.integral - сигнал ответа на накопленную ошибку - фактически демпфер (по умолчанию 0,2)
- spark.streaming.backpressure.pid.derived - ответ на ошибочный тренд (полезно для быстрого реагирования на изменения, по умолчанию 0.0)
- spark.streaming.backpressure.pid.minRate - минимальная скорость, определяемая частотой вашего пакета, измените ее, чтобы уменьшить недопоставку в заданиях с высокой пропускной способностью (по умолчанию 100)
Значения по умолчанию довольно хорошие, но я смоделировал реакцию алгоритма на различные параметры здесь