GroupIntoBatches для не-KV элементов
Согласно документации Apache Beam 2.0.0 SDK GroupIntoBatches
работает только с KV
коллекции.
Мой набор данных содержит только значения, и нет необходимости вводить ключи. Тем не менее, чтобы использовать GroupIntoBatches
Мне пришлось реализовать "поддельные" ключи с пустой строкой в качестве ключа:
static class FakeKVFn extends DoFn<String, KV<String, String>> {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(KV.of("", c.element()));
}
}
Таким образом, общий конвейер выглядит следующим образом:
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
long batchSize = 100L;
p.apply("ReadLines", TextIO.read().from("./input.txt"))
.apply("FakeKV", ParDo.of(new FakeKVFn()))
.apply(GroupIntoBatches.<String, String>ofSize(batchSize))
.setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
.apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
c.output(callWebService(c.element().getValue()));
}
}))
.apply("WriteResults", TextIO.write().to("./output/"));
p.run().waitUntilFinish();
}
Есть ли способ группировать по партиям, не вводя "поддельные" ключи?
1 ответ
Требуется предоставить входы KV для GroupIntoBatches
потому что преобразование реализовано с использованием состояния и таймеров, которые для каждого ключа и окна.
Для каждой пары "ключ + окно" состояние и таймеры обязательно выполняются последовательно (или, по-видимому, так). Вы должны вручную выразить доступный параллелизм, предоставив ключи (и окна, хотя ни один бегун, о котором я знаю, не распараллеливает на сегодня окна). Два наиболее распространенных подхода:
- Используйте какой-нибудь естественный ключ, такой как идентификатор пользователя
- Выберите некоторое фиксированное количество осколков и напишите случайным образом. Это может быть сложнее настроить. Вы должны иметь достаточно шардов, чтобы получить достаточный параллелизм, но каждый шард должен включать достаточно данных, которые
GroupIntoBatches
на самом деле полезно.
Добавление одного фиктивного ключа ко всем элементам, как в вашем фрагменте, приведет к тому, что преобразование вообще не будет выполняться параллельно. Это похоже на обсуждение индексации с отслеживанием состояния, при котором ParDo запускается однопоточным в Dataflow Runner.