Spark: как узнать количество написанных строк?
Мне интересно, есть ли способ узнать количество строк, записанных с помощью операции сохранения Spark. Я знаю, что достаточно сделать подсчет на СДР, прежде чем писать его, но я хотел бы знать, есть ли способ получить ту же информацию, не делая этого.
Спасибо марко
3 ответа
Если вы действительно хотите, вы можете добавить пользовательский слушатель и извлечь количество записанных строк из outputMetrics
, Очень простой пример может выглядеть так:
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
var recordsWrittenCount = 0L
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten
}
}
})
sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
// Long = 10
но эта часть API предназначена для внутреннего использования.
Принятый ответ более точно соответствует конкретным потребностям ОП (как указано в различных комментариях), тем не менее этот ответ подойдет большинству.
Наиболее эффективным подходом является использование Аккумулятора: http://spark.apache.org/docs/latest/programming-guide.html
val accum = sc.accumulator(0L)
data.map { x =>
accum += 1
x
}
.saveAsTextFile(path)
val count = accum.value
Затем вы можете обернуть это в полезного сутенера:
implicit class PimpedStringRDD(rdd: RDD[String]) {
def saveAsTextFileAndCount(p: String): Long = {
val accum = rdd.sparkContext.accumulator(0L)
rdd.map { x =>
accum += 1
x
}
.saveAsTextFile(p)
accum.value
}
}
Так что вы можете сделать
val count = data.saveAsTextFileAndCount(path)
Если вы посмотрите на
taskEnd.taskInfo.accumulables
Вы увидите, что он связан со следующими AccumulableInfo
в ListBuffer
в последовательном порядке.
AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None),
AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None),
AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None),
AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None),
AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None),
AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)
Вы можете ясно видеть, что количество выходных строк находится на 7-й позиции listBuffer, поэтому правильный способ получить количество записываемых строк -
taskEnd.taskInfo.accumulables(6).value.get
Мы можем получить строки, написанные следующим образом (я только что изменил ответ @zero323)
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
var recordsWrittenCount = 0L
sc.addSparkListener(new SparkListener() {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
synchronized {
recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long]
}
}
})
sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount