Как перехватить частичные обновления аккумуляторов на драйвере?
Spark 1.5.1 + Java 1.8
Мы используем spark для загрузки большого количества записей в базу данных.
Код действия выглядит так:
rdd.foreachPartition(new VoidFunction<Iterator<T>>() {
@Override
public void call(Iterator<T> iter) {
//while there are more records perform the following every 1000 records
//int[] recoords = statement.executeBatch();
//accumulator.add(recoords.length);
}
// ...
}
На узле драйвера есть поток, который контролирует значение аккумулятора. Однако значение не обновляется. Он обновляется только один раз, к тому времени, как заканчивается приложение. Даже если аккумуляторы использовали настройку отложенных значений, она должна корректно обновляться, поскольку я периодически читаю значение в потоке узла драйвера.
Я неправильно использую аккумулятор? Могу ли я в любом случае более непрерывно отслеживать прогресс со стороны моих работников?
1 ответ
Вы можете контролировать значение аккумулятора, но это не может быть сделано непрерывно, то есть пока обновления происходят после завершения задач.
Хотя аккумуляторы называются общими переменными, на самом деле они не являются общими. Каждое задание получает свой собственный аккумулятор, который объединяется после завершения задания. Это означает, что глобальные значения не могут быть обновлены во время выполнения задачи.
Чтобы увидеть обновления, количество исполнителей должно быть меньше количества обработанных разделов (что соответствует количеству задач). Причиной этого является введение "барьера", когда обновления аккумулятора отправляются водителю.
Например:
import org.apache.spark.{SparkConf, SparkContext}
object App {
def main(args: Array[String]) {
val conf = new SparkConf().setMaster("local[4]")
val sc = new SparkContext(conf)
val accum = sc.accumulator(0, "An Accumulator")
val rdd = sc.parallelize(1 to 1000, 20)
import scala.concurrent.duration._
import scala.language.postfixOps
import rx.lang.scala._
val o = Observable.interval(1000 millis).take(1000)
val s = o.subscribe(_ => println(accum.value))
rdd.foreach(x => {
Thread.sleep(x + 200)
accum += 1
})
s.unsubscribe
sc.stop
}
}
Как видите, глобальное значение обновляется только один раз для каждой задачи.
Если вы создаете именованный аккумулятор, как в приведенном примере, вы также можете отслеживать его состояние с помощью Spark UI. Просто откройте вкладку Этапы, перейдите к определенной стадии и проверьте раздел аккумуляторов.
Могу ли я в любом случае более непрерывно отслеживать прогресс со стороны моих работников?
Наиболее надежный подход заключается в увеличении степени детализации за счет добавления большего количества разделов, но это не обходится дешевле.