Как сделать saveAsTextFile НЕ разделить вывод на несколько файлов?

При использовании Scala в Spark, когда я выкидываю результаты, используя saveAsTextFile, кажется, разделить вывод на несколько частей. Я просто передаю параметр (путь) к нему.

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
  1. Соответствует ли количество выходов количеству используемых редукторов?
  2. Означает ли это, что выход сжат?
  3. Я знаю, что могу объединить вывод вместе с помощью bash, но есть ли возможность сохранить вывод в одном текстовом файле, без разделения?? Я посмотрел на документы по API, но об этом мало что сказано.

9 ответов

Решение

Причина, по которой он сохраняет его в виде нескольких файлов, заключается в том, что вычисления распределены. Если выходные данные достаточно малы, так что вы думаете, что можете разместить их на одной машине, тогда вы можете завершить свою программу с помощью

val arr = year.collect()

А затем сохранить полученный массив в виде файла. Другой способ будет использовать пользовательский разделитель, partitionBy и сделайте так, чтобы все шло к одному разделу, хотя это не рекомендуется, потому что вы не получите никакого распараллеливания.

Если вам требуется сохранить файл с saveAsTextFile ты можешь использовать coalesce(1,true).saveAsTextFile(), Это в основном означает, что вычисление затем объединяется в 1 раздел. Вы также можете использовать repartition(1) которая является просто оберткой для coalesce с аргументом shuffle, установленным в true. Изучив исходный код RDD.scala, я понял, что большинство этих вещей вы должны посмотреть.

Для тех, кто работает с большим набором данных:

  • rdd.collect() не следует использовать в этом случае, так как он будет собирать все данные как Array в драйвере, который является самым простым способом из памяти.

  • rdd.coalesce(1).saveAsTextFile() также не следует использовать, поскольку параллелизм восходящих каскадов будет потерян для выполнения на одном узле, откуда будут храниться данные.

  • rdd.coalesce(1, shuffle = true).saveAsTextFile() является лучшим простым вариантом, так как он будет поддерживать параллельную обработку заданий, а затем выполнять только перемешивание на одном узле (rdd.repartition(1).saveAsTextFile() точный синоним).

  • rdd.saveAsSingleTextFile() как предусмотрено ниже, дополнительно позволяет хранить rdd в одном файле с определенным именем, сохраняя свойства параллелизма rdd.coalesce(1, shuffle = true).saveAsTextFile(),

Что может быть неудобно с rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt") является то, что он на самом деле создает файл, путь которого path/to/file.txt/part-00000 и не path/to/file.txt,

Следующее решение rdd.saveAsSingleTextFile("path/to/file.txt") фактически создаст файл, путь которого path/to/file.txt:

package com.whatever.package

import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec

object SparkHelper {

  // This is an implicit class so that saveAsSingleTextFile can be attached to
  // SparkContext and be called like this: sc.saveAsSingleTextFile
  implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {

    def saveAsSingleTextFile(path: String): Unit =
      saveAsSingleTextFileInternal(path, None)

    def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
      saveAsSingleTextFileInternal(path, Some(codec))

    private def saveAsSingleTextFileInternal(
        path: String, codec: Option[Class[_ <: CompressionCodec]]
    ): Unit = {

      // The interface with hdfs:
      val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)

      // Classic saveAsTextFile in a temporary folder:
      hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
      codec match {
        case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
        case None        => rdd.saveAsTextFile(s"$path.tmp")
      }

      // Merge the folder of resulting part-xxxxx into one file:
      hdfs.delete(new Path(path), true) // to make sure it's not there already
      FileUtil.copyMerge(
        hdfs, new Path(s"$path.tmp"),
        hdfs, new Path(path),
        true, rdd.sparkContext.hadoopConfiguration, null
      )

      hdfs.delete(new Path(s"$path.tmp"), true)
    }
  }
}

который можно использовать таким образом:

import com.whatever.package.SparkHelper.RDDExtensions

rdd.saveAsSingleTextFile("path/to/file.txt")

// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])

Этот фрагмент сначала сохраняет RDD с rdd.saveAsTextFile("path/to/file.txt") во временной папке path/to/file.txt.tmp как будто мы не хотим хранить данные в одном файле (что обеспечивает параллельную обработку исходных задач).

И только затем, используя api файловой системы hadoop, мы продолжаем слияние (FileUtil.copyMerge()) различных выходных файлов, чтобы создать наш конечный выходной файл path/to/file.txt,

Вы могли бы позвонить coalesce(1) а потом saveAsTextFile() - но это может быть плохой идеей, если у вас много данных. Отдельные файлы на разделение генерируются так же, как в Hadoop, чтобы позволить отдельным картографам и редукторам записывать в разные файлы. Наличие одного выходного файла - хорошая идея, только если у вас очень мало данных, и в этом случае вы также можете выполнить collect(), как сказал @aaronman.

Как уже упоминали другие, вы можете собирать или объединять свой набор данных, чтобы заставить Spark создать один файл. Но это также ограничивает количество задач Spark, которые могут параллельно работать с вашим набором данных. Я предпочитаю позволить ему создать сотню файлов в выходной директории HDFS, а затем использовать hadoop fs -getmerge /hdfs/dir /local/file.txt чтобы извлечь результаты в один файл в локальной файловой системе. Это имеет смысл, когда вы выводите сравнительно небольшой отчет, конечно.

В Spark 1.6.1 этот формат показан ниже. Он создает один выходной файл. Рекомендуется использовать его, если выходные данные достаточно малы для обработки. В основном это то, что он возвращает новый RDD, сокращенный до разделов numPartitions. Если вы выполняете резкое объединение, например, для numPartitions = 1, это может привести к тому, что ваши вычисления будут выполняться на меньшем количестве узлов, чем вам нравится (например, один узел в случае numPartitions = 1)

pair_result.coalesce(1).saveAsTextFile("/app/data/")

Ты можешь позвонить repartition() и следуйте по этому пути:

val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)

var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")

Вы сможете сделать это в следующей версии Spark, в текущей версии 1.0.0 это невозможно, если вы не сделаете это вручную, например, как вы упомянули, с помощью вызова скрипта bash.

Я также хочу упомянуть, что в документации четко указано, что пользователи должны быть осторожны при соединении с реальным небольшим числом разделов. это может привести к тому, что вышестоящие разделы наследуют это количество разделов.

Я бы не рекомендовал использовать coalesce(1), если это действительно не требуется.

Вот мой ответ на вывод одного файла. Я только добавил coalesce(1)

val year = sc.textFile("apat63_99.txt")
              .map(_.split(",")(1))
              .flatMap(_.split(","))
              .map((_,1))
              .reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")

Код:

year.coalesce(1).saveAsTextFile("year")
Другие вопросы по тегам