Spark: как узнать количество написанных строк?

Мне интересно, есть ли способ узнать количество строк, записанных с помощью операции сохранения Spark. Я знаю, что достаточно сделать подсчет на СДР, прежде чем писать его, но я хотел бы знать, есть ли способ получить ту же информацию, не делая этого.

Спасибо марко

3 ответа

Решение

Если вы действительно хотите, вы можете добавить пользовательский слушатель и извлечь количество записанных строк из outputMetrics, Очень простой пример может выглядеть так:

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

var recordsWrittenCount = 0L

sc.addSparkListener(new SparkListener() { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      recordsWrittenCount += taskEnd.taskMetrics.outputMetrics.recordsWritten 
    }
  }
})

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
// Long = 10

но эта часть API предназначена для внутреннего использования.

Принятый ответ более точно соответствует конкретным потребностям ОП (как указано в различных комментариях), тем не менее этот ответ подойдет большинству.

Наиболее эффективным подходом является использование Аккумулятора: http://spark.apache.org/docs/latest/programming-guide.html

val accum = sc.accumulator(0L)

data.map { x =>
  accum += 1
  x
}
.saveAsTextFile(path)

val count = accum.value

Затем вы можете обернуть это в полезного сутенера:

implicit class PimpedStringRDD(rdd: RDD[String]) {
  def saveAsTextFileAndCount(p: String): Long = {
    val accum = rdd.sparkContext.accumulator(0L)

    rdd.map { x =>
      accum += 1
      x
    }
    .saveAsTextFile(p)

    accum.value
  }
}

Так что вы можете сделать

val count = data.saveAsTextFileAndCount(path)

Если вы посмотрите на

taskEnd.taskInfo.accumulables

Вы увидите, что он связан со следующими AccumulableInfo в ListBuffer в последовательном порядке.

AccumulableInfo(1,Some(internal.metrics.executorDeserializeTime),Some(33),Some(33),true,true,None), 
AccumulableInfo(2,Some(internal.metrics.executorDeserializeCpuTime),Some(32067956),Some(32067956),true,true,None), AccumulableInfo(3,Some(internal.metrics.executorRunTime),Some(325),Some(325),true,true,None), 
AccumulableInfo(4,Some(internal.metrics.executorCpuTime),Some(320581946),Some(320581946),true,true,None), 
AccumulableInfo(5,Some(internal.metrics.resultSize),Some(1459),Some(1459),true,true,None), 
AccumulableInfo(7,Some(internal.metrics.resultSerializationTime),Some(1),Some(1),true,true,None), 
AccumulableInfo(0,Some(number of output rows),Some(3),Some(3),true,true,Some(sql)

Вы можете ясно видеть, что количество выходных строк находится на 7-й позиции listBuffer, поэтому правильный способ получить количество записываемых строк -

taskEnd.taskInfo.accumulables(6).value.get

Мы можем получить строки, написанные следующим образом (я только что изменил ответ @zero323)

import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

var recordsWrittenCount = 0L

sc.addSparkListener(new SparkListener() { 
  override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
    synchronized {
      recordsWrittenCount += taskEnd.taskInfo.accumulables(6).value.get.asInstanceOf[Long] 
    }
  }
})

sc.parallelize(1 to 10, 2).saveAsTextFile("/tmp/foobar")
recordsWrittenCount
Другие вопросы по тегам