Найти размер данных, хранящихся в rdd из текстового файла в Apache Spark

Я новичок в Apache Spark (версия 1.4.1). Я написал небольшой код для чтения текстового файла и сохранил его данные в Rdd .

Есть ли способ, которым я могу получить размер данных в RDD.

Это мой код:

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.util.SizeEstimator
import org.apache.spark.sql.Row

object RddSize {

  def main(args: Array[String]) {

    val sc = new SparkContext("local", "data size")
    val FILE_LOCATION = "src/main/resources/employees.csv"
    val peopleRdd = sc.textFile(FILE_LOCATION)

    val newRdd = peopleRdd.filter(str => str.contains(",M,"))
    //Here I want to find whats the size remaining data
  }
} 

Я хочу получить размер данных до фильтра Transformation (peopleRdd) и после него (newRdd).

3 ответа

Решение

Есть несколько способов получить размер RDD

1. Добавьте искровой слушатель в свой искровой контекст

SparkDriver.getContext.addSparkListener(new SparkListener() {
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
  val map = stageCompleted.stageInfo.rddInfos
  map.foreach(row => {
      println("rdd memSize " + row.memSize)
      println("rdd diskSize " + row.diskSize)
   })
}})

2. Сохраните свой rdd как текстовый файл.

myRDD.saveAsTextFile("person.txt")

и вызвать Apache Spark REST API.

/applications/[app-id]/stages

3. Вы также можете попробовать SizeEstimater

val rddSize = SizeEstimator.estimate(myRDD)

Я не уверен, что вам нужно сделать это. Вы можете кэшировать rdd и проверить размер в интерфейсе Spark. Но допустим, что вы хотите сделать это программно, вот решение.

    def calcRDDSize(rdd: RDD[String]): Long = {
        //map to the size of each string, UTF-8 is the default
        rdd.map(_.getBytes("UTF-8").length.toLong) 
           .reduce(_+_) //add the sizes together
    }

Затем вы можете вызвать эту функцию для двух ваших RDD:

println(s"peopleRdd is [${calcRDDSize(peopleRdd)}] bytes in size")
println(s"newRdd is [${calcRDDSize(newRdd)}] bytes in size")

Это решение должно работать, даже если размер файла больше, чем объем памяти, доступной в кластере.

Документ API Spark гласит:

  1. Вы можете получить информацию о ваших RDD из контекста Spark: sc.getRDDStorageInfo
  2. Информация RDD включает в себя память и размер диска: RDDInfo doc
Другие вопросы по тегам