Как сделать saveAsTextFile НЕ разделить вывод на несколько файлов?
При использовании Scala в Spark, когда я выкидываю результаты, используя saveAsTextFile
, кажется, разделить вывод на несколько частей. Я просто передаю параметр (путь) к нему.
val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
- Соответствует ли количество выходов количеству используемых редукторов?
- Означает ли это, что выход сжат?
- Я знаю, что могу объединить вывод вместе с помощью bash, но есть ли возможность сохранить вывод в одном текстовом файле, без разделения?? Я посмотрел на документы по API, но об этом мало что сказано.
9 ответов
Причина, по которой он сохраняет его в виде нескольких файлов, заключается в том, что вычисления распределены. Если выходные данные достаточно малы, так что вы думаете, что можете разместить их на одной машине, тогда вы можете завершить свою программу с помощью
val arr = year.collect()
А затем сохранить полученный массив в виде файла. Другой способ будет использовать пользовательский разделитель, partitionBy
и сделайте так, чтобы все шло к одному разделу, хотя это не рекомендуется, потому что вы не получите никакого распараллеливания.
Если вам требуется сохранить файл с saveAsTextFile
ты можешь использовать coalesce(1,true).saveAsTextFile()
, Это в основном означает, что вычисление затем объединяется в 1 раздел. Вы также можете использовать repartition(1)
которая является просто оберткой для coalesce
с аргументом shuffle, установленным в true. Изучив исходный код RDD.scala, я понял, что большинство этих вещей вы должны посмотреть.
Для тех, кто работает с большим набором данных:
rdd.collect()
не следует использовать в этом случае, так как он будет собирать все данные какArray
в драйвере, который является самым простым способом из памяти.rdd.coalesce(1).saveAsTextFile()
также не следует использовать, поскольку параллелизм восходящих каскадов будет потерян для выполнения на одном узле, откуда будут храниться данные.rdd.coalesce(1, shuffle = true).saveAsTextFile()
является лучшим простым вариантом, так как он будет поддерживать параллельную обработку заданий, а затем выполнять только перемешивание на одном узле (rdd.repartition(1).saveAsTextFile()
точный синоним).rdd.saveAsSingleTextFile()
как предусмотрено ниже, дополнительно позволяет хранить rdd в одном файле с определенным именем, сохраняя свойства параллелизмаrdd.coalesce(1, shuffle = true).saveAsTextFile()
,
Что может быть неудобно с rdd.coalesce(1, shuffle = true).saveAsTextFile("path/to/file.txt")
является то, что он на самом деле создает файл, путь которого path/to/file.txt/part-00000
и не path/to/file.txt
,
Следующее решение rdd.saveAsSingleTextFile("path/to/file.txt")
фактически создаст файл, путь которого path/to/file.txt
:
package com.whatever.package
import org.apache.spark.rdd.RDD
import org.apache.hadoop.fs.{FileSystem, FileUtil, Path}
import org.apache.hadoop.io.compress.CompressionCodec
object SparkHelper {
// This is an implicit class so that saveAsSingleTextFile can be attached to
// SparkContext and be called like this: sc.saveAsSingleTextFile
implicit class RDDExtensions(val rdd: RDD[String]) extends AnyVal {
def saveAsSingleTextFile(path: String): Unit =
saveAsSingleTextFileInternal(path, None)
def saveAsSingleTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit =
saveAsSingleTextFileInternal(path, Some(codec))
private def saveAsSingleTextFileInternal(
path: String, codec: Option[Class[_ <: CompressionCodec]]
): Unit = {
// The interface with hdfs:
val hdfs = FileSystem.get(rdd.sparkContext.hadoopConfiguration)
// Classic saveAsTextFile in a temporary folder:
hdfs.delete(new Path(s"$path.tmp"), true) // to make sure it's not there already
codec match {
case Some(codec) => rdd.saveAsTextFile(s"$path.tmp", codec)
case None => rdd.saveAsTextFile(s"$path.tmp")
}
// Merge the folder of resulting part-xxxxx into one file:
hdfs.delete(new Path(path), true) // to make sure it's not there already
FileUtil.copyMerge(
hdfs, new Path(s"$path.tmp"),
hdfs, new Path(path),
true, rdd.sparkContext.hadoopConfiguration, null
)
hdfs.delete(new Path(s"$path.tmp"), true)
}
}
}
который можно использовать таким образом:
import com.whatever.package.SparkHelper.RDDExtensions
rdd.saveAsSingleTextFile("path/to/file.txt")
// Or if the produced file is to be compressed:
import org.apache.hadoop.io.compress.GzipCodec
rdd.saveAsSingleTextFile("path/to/file.txt.gz", classOf[GzipCodec])
Этот фрагмент сначала сохраняет RDD с rdd.saveAsTextFile("path/to/file.txt")
во временной папке path/to/file.txt.tmp
как будто мы не хотим хранить данные в одном файле (что обеспечивает параллельную обработку исходных задач).
И только затем, используя api файловой системы hadoop, мы продолжаем слияние (FileUtil.copyMerge()
) различных выходных файлов, чтобы создать наш конечный выходной файл path/to/file.txt
,
Вы могли бы позвонить coalesce(1)
а потом saveAsTextFile()
- но это может быть плохой идеей, если у вас много данных. Отдельные файлы на разделение генерируются так же, как в Hadoop, чтобы позволить отдельным картографам и редукторам записывать в разные файлы. Наличие одного выходного файла - хорошая идея, только если у вас очень мало данных, и в этом случае вы также можете выполнить collect(), как сказал @aaronman.
Как уже упоминали другие, вы можете собирать или объединять свой набор данных, чтобы заставить Spark создать один файл. Но это также ограничивает количество задач Spark, которые могут параллельно работать с вашим набором данных. Я предпочитаю позволить ему создать сотню файлов в выходной директории HDFS, а затем использовать hadoop fs -getmerge /hdfs/dir /local/file.txt
чтобы извлечь результаты в один файл в локальной файловой системе. Это имеет смысл, когда вы выводите сравнительно небольшой отчет, конечно.
В Spark 1.6.1 этот формат показан ниже. Он создает один выходной файл. Рекомендуется использовать его, если выходные данные достаточно малы для обработки. В основном это то, что он возвращает новый RDD, сокращенный до разделов numPartitions. Если вы выполняете резкое объединение, например, для numPartitions = 1, это может привести к тому, что ваши вычисления будут выполняться на меньшем количестве узлов, чем вам нравится (например, один узел в случае numPartitions = 1)
pair_result.coalesce(1).saveAsTextFile("/app/data/")
Ты можешь позвонить repartition()
и следуйте по этому пути:
val year = sc.textFile("apat63_99.txt").map(_.split(",")(1)).flatMap(_.split(",")).map((_,1)).reduceByKey((_+_)).map(_.swap)
var repartitioned = year.repartition(1)
repartitioned.saveAsTextFile("C:/Users/TheBhaskarDas/Desktop/wc_spark00")
Вы сможете сделать это в следующей версии Spark, в текущей версии 1.0.0 это невозможно, если вы не сделаете это вручную, например, как вы упомянули, с помощью вызова скрипта bash.
Я также хочу упомянуть, что в документации четко указано, что пользователи должны быть осторожны при соединении с реальным небольшим числом разделов. это может привести к тому, что вышестоящие разделы наследуют это количество разделов.
Я бы не рекомендовал использовать coalesce(1), если это действительно не требуется.
Вот мой ответ на вывод одного файла. Я только добавил coalesce(1)
val year = sc.textFile("apat63_99.txt")
.map(_.split(",")(1))
.flatMap(_.split(","))
.map((_,1))
.reduceByKey((_+_)).map(_.swap)
year.saveAsTextFile("year")
Код:
year.coalesce(1).saveAsTextFile("year")