Как объединить результаты в Scalding

Я пытаюсь вывести канал в разные каталоги так, чтобы выходные данные каждого каталога были объединены на основе некоторых идентификаторов. Таким образом, в простом коде сокращения кода я бы использовал класс MultipleOutputs, и я бы сделал что-то подобное в редукторе.

protected void reduce(final SomeKey key,
      final Iterable<SomeValue> values,
      final Context context) {

   ...
   for (SomeValue value: values) {
     String bucketId = computeBucketIdFrom(...);
     multipleOutputs.write(key, value, folderName + "/" + bucketId);
   ...

Так что я думаю, что это можно сделать в ожогах

...
  val somePipe = Csv(in, separator = "\t",
        fields = someSchema,
        skipHeader = true)
    .read

  for (i <- 1 until numberOfBuckets) {
    somePipe
    .filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
    .write(Csv(out + "/bucket" + i ,
      writeHeader = true,
      separator = "\t"))
  }

Но я чувствую, что вам придется много раз повторять одну и ту же трубу, и это повлияет на общую производительность.

Есть ли другие альтернативы?

Спасибо

1 ответ

Да, конечно, есть лучший способ использования TemplatedTsv.

Таким образом, ваш код выше может быть написан следующим образом:

val somePipe = Tsv(in, fields = someSchema, skipHeader = true)
    .read
    .write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true))

Это поместит все записи из some_id в отдельные папки в папке out/ some_ids.

Однако вы также можете создавать целочисленные сегменты. Просто измените последние строки,

.map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }    
.write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket)))

Это создаст двухзначные папки как out/ dd /. Вы также можете проверить TemplatedTsv API здесь.

Может быть небольшая проблема с использованием templatedTsv, то есть редукторы могут генерировать много маленьких файлов, которые могут быть плохими для следующей работы, используя ваши результаты. Поэтому перед записью на диск лучше отсортировать поля шаблона. Я написал блог об этом здесь.

Другие вопросы по тегам