Когда аккумуляторы действительно надежны?
Я хочу использовать аккумулятор для сбора статистики о данных, которыми я манипулирую в работе Spark. В идеале, я бы сделал это, пока задание вычисляет требуемые преобразования, но поскольку Spark будет повторно вычислять задачи в разных случаях, аккумуляторы не будут отражать истинные метрики. Вот как документация описывает это:
Для обновлений аккумулятора, выполняемых только внутри действий, Spark гарантирует, что обновление каждого задания к аккумулятору будет применено только один раз, т.е. перезапущенные задачи не будут обновлять значение. В преобразованиях пользователи должны знать, что обновление каждой задачи может применяться более одного раза, если задачи или этапы задания выполняются повторно.
Это сбивает с толку, так как большинство действий не позволяют запускать пользовательский код (где могут использоваться аккумуляторы), они в основном принимают результаты предыдущих преобразований (лениво). Документация также показывает это:
val acc = sc.accumulator(0)
data.map(x => acc += x; f(x))
// Here, acc is still 0 because no actions have cause the `map` to be computed.
Но если мы добавим data.count()
в конце концов, будет ли это правильно (без дубликатов) или нет? очевидно acc
не используется "только внутри действий", так как карта является преобразованием. Так что это не должно быть гарантировано.
С другой стороны, обсуждение связанных билетов Jira говорит о "задачах результата", а не о "действиях". Например, здесь и здесь. Кажется, это указывает на то, что результат действительно гарантированно будет правильным, так как мы используем acc
непосредственно перед и действие и, следовательно, должны быть рассчитаны как один этап.
Я предполагаю, что это понятие "задачи результата" связано с типом задействованных операций, являясь последним, который включает действие, как в этом примере, который показывает, как несколько операций делятся на этапы (в пурпурном, изображение взято отсюда):
Так что гипотетически count()
действие в конце этой цепочки будет частью той же последней стадии, и я гарантирую, что аккумуляторы, использованные на последней карте, не будут содержать дубликатов?
Разъяснение по этому вопросу было бы здорово! Благодарю.
3 ответа
Чтобы ответить на вопрос "Когда аккумуляторы действительно надежны?"
Ответ: когда они присутствуют в операции действия.
Согласно документации в Action Task, даже если присутствуют какие-либо перезапущенные задачи, он обновит Аккумулятор только один раз.
Для обновлений аккумулятора, выполняемых только внутри действий, Spark гарантирует, что обновление каждого задания к аккумулятору будет применено только один раз, т.е. перезапущенные задачи не будут обновлять значение. В преобразованиях пользователи должны знать, что обновление каждой задачи может применяться более одного раза, если задачи или этапы задания выполняются повторно.
А Action позволяют запускать пользовательский код.
Например
val accNotEmpty = sc.accumulator(0)
ip.foreach(x=>{
if(x!=""){
accNotEmpty += 1
}
})
Но, Почему Карта + Действие, а именно. Результат Операции Задачи не надежны для работы Аккумулятора?
- Задача не выполнена из-за какого-то исключения в коде. Spark будет пытаться выполнить это 4 раза (число попыток по умолчанию). Если задача не выполняется каждый раз, когда она выдаст исключение. Если это произойдет случайно, Spark продолжит работу и просто обновит значение аккумулятора для успешного состояния, а значения аккумуляторов с ошибочными состояниями игнорируются.
Вердикт: Правильно - Этапный сбой: если происходит сбой узла-исполнителя, то не ошибка пользователя, а аппаратный сбой - и если узел выходит из строя на этапе случайного воспроизведения. Поскольку выходные данные случайного порядка сохраняются локально, если выходной узел отключается, этот случайный вывод исчезает. Таким образом, Spark исчезает Вернемся к этапу, который сгенерировал выходные данные тасования, просматривает, какие задачи необходимо выполнить повторно, и выполняет их на одном из узлов, который еще жив. После того, как мы регенерируем отсутствующий вывод с тасованием, этап, который сгенерировал выходные данные карты, выполнил некоторое из его задач несколько раз. Spark считает обновления аккумулятора от всех из них.
Вердикт: не обрабатывается в Result Task. Аккумулятор выдаст неверный результат. - Если задача выполняется медленно, Spark может запустить спекулятивную копию этой задачи на другом узле.
Вердикт: не обработан. Аккумулятор выдаст неверный результат. - Кэшируемый RDD огромен и не может находиться в памяти. Поэтому, когда RDD используется, он запускает операцию Map для получения RDD и снова обновляет аккумулятор.
Вердикт: не обработан. Аккумулятор выдаст неверный результат.
Таким образом, может случиться, что одна и та же функция может запускаться несколько раз для одних и тех же данных. Поэтому Spark не дает никаких гарантий обновления аккумулятора из-за операции Map.
Поэтому лучше использовать Аккумулятор в действии в Spark.
Чтобы узнать больше об Аккумуляторе и его проблемах, обратитесь к этому сообщению в блоге - Имран Рашид.
Обновления аккумулятора отправляются обратно водителю, когда задача успешно завершена. Таким образом, результаты вашего накопителя гарантированно будут правильными, если вы уверены, что каждая задача будет выполнена ровно один раз, а каждая задача выполнена так, как вы ожидали.
Я предпочитаю полагаться на reduce
а также aggregate
вместо аккумуляторов, потому что довольно сложно перечислить все способы выполнения задач.
- Действие запускает задачи.
- Если действие зависит от более ранней стадии и результаты этой стадии не (полностью) кэшированы, тогда будут запущены задачи из более ранней стадии.
- Спекулятивное выполнение запускает дублирующие задачи, когда обнаруживается небольшое количество медленных задач.
Тем не менее, есть много простых случаев, когда аккумуляторам можно полностью доверять.
val acc = sc.accumulator(0)
val rdd = sc.parallelize(1 to 10, 2)
val accumulating = rdd.map { x => acc += 1; x }
accumulating.count
assert(acc == 10)
Будет ли это гарантированно правильным (без дубликатов)?
Да, если спекулятивное исполнение отключено. map
и count
будет один этап, так что, как вы говорите, задача не может быть успешно выполнена более одного раза.
Но аккумулятор обновляется как побочный эффект. Поэтому вы должны быть очень осторожны, думая о том, как будет выполняться код. Рассмотрим это вместо accumulating.count
:
// Same setup as before.
accumulating.mapPartitions(p => Iterator(p.next)).collect
assert(acc == 2)
Это также создаст одну задачу для каждого раздела, и каждая задача будет гарантированно выполнена ровно один раз. Но код в map
не будет выполняться для всех элементов, только для первого в каждом разделе.
Аккумулятор похож на глобальную переменную. Если вы передаете ссылку на СДР, которая может увеличивать накопитель, тогда другой код (другие потоки) также может вызвать его приращение.
// Same setup as before.
val x = new X(accumulating) // We don't know what X does.
// It may trigger the calculation
// any number of times.
accumulating.count
assert(acc >= 10)
Я думаю, что Матей ответил на это в упомянутой документации:
Как обсуждалось на https://github.com/apache/spark/pull/2524 довольно сложно обеспечить хорошую семантику в общем случае (обновления аккумулятора на этапах без результата) по следующим причинам:
СДР может быть рассчитана как часть нескольких этапов. Например, если вы обновляете аккумулятор внутри MappedRDD, а затем тасуете его, это может быть одним этапом. Но если затем вы снова вызовите map() для MappedRDD и перемешаете результат, вы получите второй этап, где эта карта является конвейером. Вы хотите сосчитать это обновление аккумулятора дважды или нет?
Целые этапы могут быть повторно отправлены, если случайные файлы удаляются периодическим очистителем или теряются из-за сбоя узла, поэтому все, что отслеживает СДР, должно выполняться в течение длительных периодов времени (пока СДР является ссылочным в пользовательской программе).), что было бы довольно сложно реализовать.
Поэтому я собираюсь пометить это как "не исправлю" пока, за исключением части для этапов результата, сделанных в SPARK-3628.