Разделите данные Spark DataFrame на отдельные файлы
У меня есть следующий ввод DataFrame из файла s3 и мне нужно преобразовать данные в следующий желаемый вывод. Я использую Spark версии 1.5.1 со Scala, но могу перейти на Spark с Python. Любые предложения приветствуются.
Ввод DataFrame:
name animal data
john mouse aaaaa
bob mouse bbbbb
bob mouse ccccc
bob dog ddddd
Желаемый результат:
john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv
terminal$ cat bob/mouse/file.csv
bbbbb
ccccc
terminal$ cat bob/dog/file.csv
ddddd
Вот мой существующий код Spark Scala, который я пробовал:
val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)
Токовый выход:
[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]
Некоторые проблемы с моим существующим кодом состоят в том, что groupBy возвращает объект GroupedData, и я, вероятно, не хочу выполнять функцию count/sum/agg для этих данных. Я ищу лучшую технику для группировки и вывода данных. Набор данных очень большой.
1 ответ
Это может быть достигнуто с помощью partitionBy
вариант DataFrameWriter
, Общий синтаксис выглядит следующим образом:
df.write.partitionBy("name", "animal").format(...).save(...)
К сожалению, единственный простой текстовый формат, который поддерживает разбиение в Spark 1.5, - это JSON.
Если вы можете обновить установку Spark до:
- 1.6 - вы можете использовать
partitionBy
сtext
формат. 1.6 также требуется, если вам нужен один выходной файл для группы (repartition
). - 2.0 - вы можете использовать
partitionBy
сcsv
формат.
Я считаю, что в 1.5 ваш лучший вариант - записывать файлы в формате JSON и конвертировать отдельные выходные файлы.
Если число отличных name', 'animals
мала, вы можете попробовать выполнить отдельную запись для каждой группы:
val dist = df.select("name", "animal").rdd.collect.map {
case Row(name: String, animal: String) => (name, animal)
}
for {
(name, animal) <- dist
} df.where($"name" === name && $"animal" === animal)
.select($"data").write.format("csv").save(s"/prefix/$name/$animal")
но это не будет масштабироваться, когда число комбинаций растет.