Как вывести данные с помощью структуры каталогов в стиле Hive в Scalding?
Мы используем Scalding для создания ETL и генерируем вывод в виде таблицы Hive с разделами. Следовательно, мы хотим, чтобы имена каталогов для разделов были чем-то вроде "state=CA", например. Мы используем TemplatedTsv следующим образом:
pipe
// some other ETL
.map('STATE -> 'hdfs_state) { state: Int => "State=" + state }
.groupBy('hdfs_state) { _.pass }
.write(TemplatedTsv(baseOutputPath, "%s", 'hdfs_state,
writeHeader = false,
sinkMode = SinkMode.UPDATE,
fields = ('all except 'hdfs_state)))
Мы принимаем пример кода из Как собрать результаты в Scalding. Вот две проблемы, которые у нас есть:
- кроме как не может быть решено IntelliJ: я пропускаю какой-то импорт? Мы не хотим явно вводить все поля в выражении "fields = ()", так как поля получены из кода внутри выражения groupBy. Если ввести явно, они могут быть легко не синхронизированы.
- Этот подход выглядит слишком странным, поскольку мы создаем дополнительный столбец, чтобы имена каталогов могли обрабатываться Hive/Hcatalog. Мы задаемся вопросом, каким должен быть правильный способ достичь этого?
Большое спасибо!
1 ответ
Извините, предыдущий пример был псевдокодом. Ниже я приведу небольшой код с примером ввода данных.
Обратите внимание, что это работает только с Scalding версии 0.12.0 или выше.
Давайте представим, что у нас есть входные данные, как показано ниже, которые определяют некоторые данные о покупке,
user1 1384034400 6 75
user1 1384038000 6 175
user2 1383984000 48 3
user3 1383958800 48 281
user3 1384027200 9 7
user3 1384027200 9 11
user4 1383955200 37 705
user4 1383955200 37 15
user4 1383969600 36 41
user4 1383969600 36 21
Вкладка разделена и 3-й столбец является номером государства. Здесь у нас есть целое число, но для строковых состояний вы можете легко адаптироваться.
Этот код будет читать входные данные и помещать их в области выходных папок State =stateid.
class TemplatedTsvExample(args: Args) extends Job(args) {
val purchasesPath = args("purchases")
val outputPath = args("output")
// defines both input & output schema, you can also make separate for each of them
val ioSchema = ('USERID, 'TIMESTAMP, 'STATE, 'PURCHASE)
val Purchases =
Tsv(purchasesPath, ioSchema)
.read
.map('STATE -> 'STATENAME) { state: Int => "State=" + state } // here you can make necessary changes
.groupBy('STATENAME) { _.pass } // this is optional
.write(TemplatedTsv(outputPath, "%s", 'STATENAME, false, SinkMode.REPLACE, ioSchema))
}
Я надеюсь, что это полезно. Пожалуйста, спросите меня, если что-то не понятно.
Вы можете найти полный код здесь.