Разделите данные Spark DataFrame на отдельные файлы

У меня есть следующий ввод DataFrame из файла s3 и мне нужно преобразовать данные в следующий желаемый вывод. Я использую Spark версии 1.5.1 со Scala, но могу перейти на Spark с Python. Любые предложения приветствуются.

Ввод DataFrame:

name    animal   data
john    mouse    aaaaa
bob     mouse    bbbbb
bob     mouse    ccccc
bob     dog      ddddd

Желаемый результат:

john/mouse/file.csv
bob/mouse/file.csv
bob/dog/file.csv

terminal$ cat bob/mouse/file.csv
bbbbb
ccccc

terminal$ cat bob/dog/file.csv
ddddd

Вот мой существующий код Spark Scala, который я пробовал:

val sc = new SparkContext(new SparkConf())
val sqlc = new org.apache.spark.sql.SQLContext(sc)
val df = sqlc.read.json("raw.gz")
val cols = Seq("name", "animal")
df.groupBy(cols.head, cols.tail: _*).count().take(100).foreach(println)

Токовый выход:

[john,mouse,1]
[bob,mouse,2]
[bob,dog,1]

Некоторые проблемы с моим существующим кодом состоят в том, что groupBy возвращает объект GroupedData, и я, вероятно, не хочу выполнять функцию count/sum/agg для этих данных. Я ищу лучшую технику для группировки и вывода данных. Набор данных очень большой.

1 ответ

Решение

Это может быть достигнуто с помощью partitionBy вариант DataFrameWriter, Общий синтаксис выглядит следующим образом:

df.write.partitionBy("name", "animal").format(...).save(...)

К сожалению, единственный простой текстовый формат, который поддерживает разбиение в Spark 1.5, - это JSON.

Если вы можете обновить установку Spark до:

  • 1.6 - вы можете использовать partitionBy с text формат. 1.6 также требуется, если вам нужен один выходной файл для группы (repartition).
  • 2.0 - вы можете использовать partitionBy с csv формат.

Я считаю, что в 1.5 ваш лучший вариант - записывать файлы в формате JSON и конвертировать отдельные выходные файлы.

Если число отличных name', 'animals мала, вы можете попробовать выполнить отдельную запись для каждой группы:

val dist = df.select("name", "animal").rdd.collect.map {
  case Row(name: String, animal: String) => (name, animal)
}

for {
  (name, animal) <- dist
} df.where($"name" === name && $"animal" === animal)
    .select($"data").write.format("csv").save(s"/prefix/$name/$animal")

но это не будет масштабироваться, когда число комбинаций растет.

Другие вопросы по тегам