Символы становятся поврежденными, если spark.executor.memory не установлен должным образом при импорте CSV в DataFrame

ОБНОВЛЕНИЕ: Пожалуйста, держитесь за этот вопрос. Я обнаружил, что это может быть проблемой самого Spark 1.5, поскольку я не использую официальную версию Spark. Я буду обновлять этот вопрос. Спасибо!

Недавно я заметил странную ошибку при использовании Spark-CSV для импорта CSV в DataFrame в Spark.

Вот мой пример кода:

  object sparktry
  {
    def main(args: Array[String])
    {
      AutoLogger.setLevel("INFO")

      val sc = SingletonSparkContext.getInstance()
      val sql_context = SingletonSQLContext.getInstance(sc)

      val options = new collection.mutable.HashMap[String, String]()
      options += "header" -> "true"
      options += "charset" -> "UTF-8"

      val customSchema = StructType(Array(
        StructField("Year", StringType),
        StructField("Brand", StringType),
        StructField("Category", StringType),
        StructField("Model", StringType),
        StructField("Sales", DoubleType)))

      val dataFrame = sql_context.read.format("com.databricks.spark.csv")
      .options(options)
      .schema(customSchema)
      .load("hdfs://myHDFSserver:9000/BigData/CarSales.csv")

      dataFrame.head(10).foreach(x => AutoLogger.info(x.toString))
    }
  }

CarSales очень маленький CSV. Я заметил, что когда spark.master не является local, настройка spark.executor.memory выше 16 ГБ приведет к повреждению DataFrame. Вывод этой программы показан ниже: (Я скопировал текст из журнала, и в этом случае spark.executor.memory установлен на 32 ГБ)

16/03/07 12:39:50.190 INFO DAGScheduler: Job 1 finished: head at sparktry.scala:35, took 8.009183 s
16/03/07 12:39:50.225 INFO AutoLogger$: [       ,  ,      ,ries       ,142490.0]
16/03/07 12:39:50.225 INFO AutoLogger$: [       ,  ,      ,ries       ,112464.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,90960.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,100910.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,94371.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,      ,ries       ,54142.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,       ,ries       ,14773.0]
16/03/07 12:39:50.226 INFO AutoLogger$: [       ,  ,       ,ries       ,12276.0]
16/03/07 12:39:50.227 INFO AutoLogger$: [       ,  ,       ,ries       ,9254.0]
16/03/07 12:39:50.227 INFO AutoLogger$: [       ,  ,       ,ries       ,12253.0]

Хотя первые 10 строк этого файла:

1/1/2007,BMW,Compact,BMW 3-Series,142490.00
1/1/2008,BMW,Compact,BMW 3-Series,112464.00
1/1/2009,BMW,Compact,BMW 3-Series,90960.00
1/1/2010,BMW,Compact,BMW 3-Series,100910.00
1/1/2011,BMW,Compact,BMW 3-Series,94371.00
1/1/2007,BMW,Compact,BMW 5-Series,54142.00
1/1/2007,BMW,Fullsize,BMW 7-Series,14773.00
1/1/2008,BMW,Fullsize,BMW 7-Series,12276.00
1/1/2009,BMW,Fullsize,BMW 7-Series,9254.00
1/1/2010,BMW,Fullsize,BMW 7-Series,12253.00

Я заметил, что только изменяя spark.executor.memory до 16 ГБ на моей машине, первые 10 строк верны, но установка более 16 ГБ приведет к повреждению.

Более того: на одном из моих серверов с 256 ГБ памяти установка 16 ГБ также приводит к этой ошибке. Вместо этого, установив его на 48 ГБ, он будет работать нормально. Кроме того, я попытался напечатать dataFrame.rdd, он показывает, что содержимое RDD является правильным, а сам фрейм данных - нет.

Кто-нибудь имеет представление об этой проблеме?

Спасибо!

2 ответа

Решение

Оказывается, это ошибка при сериализации с Kyro в Spark 1.5.1 и 1.5.2.

https://github.com/databricks/spark-csv/issues/285

Это исправлено в 1.6.0. Это не имеет ничего общего с spark-CSV.

Я запустил ваш код и смог извлечь данные в формате csv из hdfs с конфигурацией Spark по умолчанию.

Я обновил ваш код для следующих строк:

val conf = new org.apache.spark.SparkConf().setMaster("local[2]").setAppName("HDFSReadDemo");
val sc = new org.apache.spark.SparkContext(conf); 
val sql_context = new org.apache.spark.sql.SQLContext(sc) 

И println() вместо регистратора.

dataFrame.head(10).foreach(x => println(x))

Так что с настройкой памяти Spark все должно быть в порядке (т.е. spark.executor.memory)

Другие вопросы по тегам