Пример каскадного группового
Мне недавно пришлось сделать каскадную работу. У меня никогда не было опыта работы с распределенными системами, поэтому у меня возникли проблемы с пониманием того, как заставить это работать.
У меня есть файл конфигурации, который имеет кучу ведер:
Bucket{
bucket_name: "X"
input_path: "s3://..."
key_column: 1
value_column: 2
multivalue: false
default_value:
type_column: int
}
...
По сути, все, что мне нужно сделать, - это использовать его для сбора нескольких файлов (каждый из них похож на таблицу tsv, в которой сопоставляются ключи URL с некоторым значением), а затем группировать по URL.
В общем, вот как выглядит план:
A --> |group |
B --> |by |--> output
C --> |url |
Мне было интересно, если следующая логика верна: 1) Мне нужно создать кран для каждого из ведер, т.е.
Tap inputTap = new GlobHfs(new TextLine(), bucket.getInputPath());
2) Мне нужно создать КАЖДУЮ трубу из всех каналов (в этой части я не уверен, нужна ли мне каждая труба, каким должен быть мой фильтр / функция?). Прямо сейчас я создал Every Pipe, который разделяет линии на вкладки.
RegexSplitGenerator splitter = new RegexSplitGenerator("\t");
Pipe tokenizedPipe = bucket.getBucketName(), new Field("Line"), splitter));
3) Создать групповую трубу, которая объединяет все эти токенизированные трубы. Я не совсем уверен, как заставить групповой канал выбирать только ключевые столбцы, но метод, который я сейчас использую:
Pipe finalPipe = new Groupby("Output Pipe", inputPipes, groupFields);
Так это правильная логика для решения этой проблемы? Или некоторые из моих шагов избыточны или неверны?
Спасибо!
1 ответ
Ваша мысль выглядит хорошо для меня. Шаг 2 можно пропустить, если вы разделяете записи, когда кран читает входные файлы.
Tap inputTap = new Hfs(new TextDelimited("\t"), inputPath);
Каскадирование для Impatient охватывает, как реализовать то, что вы хотите достичь, на это стоит взглянуть.