Как объединить результаты в Scalding
Я пытаюсь вывести канал в разные каталоги так, чтобы выходные данные каждого каталога были объединены на основе некоторых идентификаторов. Таким образом, в простом коде сокращения кода я бы использовал класс MultipleOutputs, и я бы сделал что-то подобное в редукторе.
protected void reduce(final SomeKey key,
final Iterable<SomeValue> values,
final Context context) {
...
for (SomeValue value: values) {
String bucketId = computeBucketIdFrom(...);
multipleOutputs.write(key, value, folderName + "/" + bucketId);
...
Так что я думаю, что это можно сделать в ожогах
...
val somePipe = Csv(in, separator = "\t",
fields = someSchema,
skipHeader = true)
.read
for (i <- 1 until numberOfBuckets) {
somePipe
.filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
.write(Csv(out + "/bucket" + i ,
writeHeader = true,
separator = "\t"))
}
Но я чувствую, что вам придется много раз повторять одну и ту же трубу, и это повлияет на общую производительность.
Есть ли другие альтернативы?
Спасибо
1 ответ
Да, конечно, есть лучший способ использования TemplatedTsv.
Таким образом, ваш код выше может быть написан следующим образом:
val somePipe = Tsv(in, fields = someSchema, skipHeader = true)
.read
.write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true))
Это поместит все записи из some_id в отдельные папки в папке out/ some_ids.
Однако вы также можете создавать целочисленные сегменты. Просто измените последние строки,
.map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }
.write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket)))
Это создаст двухзначные папки как out/ dd /. Вы также можете проверить TemplatedTsv API здесь.
Может быть небольшая проблема с использованием templatedTsv, то есть редукторы могут генерировать много маленьких файлов, которые могут быть плохими для следующей работы, используя ваши результаты. Поэтому перед записью на диск лучше отсортировать поля шаблона. Я написал блог об этом здесь.