Как вывести данные с помощью структуры каталогов в стиле Hive в Scalding?

Мы используем Scalding для создания ETL и генерируем вывод в виде таблицы Hive с разделами. Следовательно, мы хотим, чтобы имена каталогов для разделов были чем-то вроде "state=CA", например. Мы используем TemplatedTsv следующим образом:

pipe
   // some other ETL
   .map('STATE -> 'hdfs_state) { state: Int => "State=" + state }
   .groupBy('hdfs_state) { _.pass }
   .write(TemplatedTsv(baseOutputPath, "%s", 'hdfs_state,
          writeHeader = false,
          sinkMode = SinkMode.UPDATE,
          fields = ('all except 'hdfs_state)))

Мы принимаем пример кода из Как собрать результаты в Scalding. Вот две проблемы, которые у нас есть:

  • кроме как не может быть решено IntelliJ: я пропускаю какой-то импорт? Мы не хотим явно вводить все поля в выражении "fields = ()", так как поля получены из кода внутри выражения groupBy. Если ввести явно, они могут быть легко не синхронизированы.
  • Этот подход выглядит слишком странным, поскольку мы создаем дополнительный столбец, чтобы имена каталогов могли обрабатываться Hive/Hcatalog. Мы задаемся вопросом, каким должен быть правильный способ достичь этого?

Большое спасибо!

1 ответ

Извините, предыдущий пример был псевдокодом. Ниже я приведу небольшой код с примером ввода данных.

Обратите внимание, что это работает только с Scalding версии 0.12.0 или выше.

Давайте представим, что у нас есть входные данные, как показано ниже, которые определяют некоторые данные о покупке,

user1   1384034400  6   75
user1   1384038000  6   175
user2   1383984000  48  3
user3   1383958800  48  281
user3   1384027200  9   7
user3   1384027200  9   11
user4   1383955200  37  705
user4   1383955200  37  15
user4   1383969600  36  41
user4   1383969600  36  21

Вкладка разделена и 3-й столбец является номером государства. Здесь у нас есть целое число, но для строковых состояний вы можете легко адаптироваться.

Этот код будет читать входные данные и помещать их в области выходных папок State =stateid.

class TemplatedTsvExample(args: Args) extends Job(args) {

  val purchasesPath = args("purchases")
  val outputPath    = args("output")

  // defines both input & output schema, you can also make separate for each of them
  val ioSchema = ('USERID, 'TIMESTAMP, 'STATE, 'PURCHASE)

  val Purchases =
     Tsv(purchasesPath, ioSchema)
     .read
     .map('STATE -> 'STATENAME) { state: Int => "State=" + state } // here you can make necessary changes
     .groupBy('STATENAME) { _.pass } // this is optional
     .write(TemplatedTsv(outputPath, "%s", 'STATENAME, false, SinkMode.REPLACE, ioSchema))
} 

Я надеюсь, что это полезно. Пожалуйста, спросите меня, если что-то не понятно.

Вы можете найти полный код здесь.

Другие вопросы по тегам