GroupIntoBatches для не-KV элементов

Согласно документации Apache Beam 2.0.0 SDK GroupIntoBatches работает только с KV коллекции.

Мой набор данных содержит только значения, и нет необходимости вводить ключи. Тем не менее, чтобы использовать GroupIntoBatches Мне пришлось реализовать "поддельные" ключи с пустой строкой в ​​качестве ключа:

static class FakeKVFn extends DoFn<String, KV<String, String>> {
  @ProcessElement
  public void processElement(ProcessContext c) {
    c.output(KV.of("", c.element()));
  }
}

Таким образом, общий конвейер выглядит следующим образом:

public static void main(String[] args) {
  PipelineOptions options = PipelineOptionsFactory.create();
  Pipeline p = Pipeline.create(options);

  long batchSize = 100L;

  p.apply("ReadLines", TextIO.read().from("./input.txt"))
      .apply("FakeKV", ParDo.of(new FakeKVFn()))
      .apply(GroupIntoBatches.<String, String>ofSize(batchSize))
      .setCoder(KvCoder.of(StringUtf8Coder.of(), IterableCoder.of(StringUtf8Coder.of())))
      .apply(ParDo.of(new DoFn<KV<String, Iterable<String>>, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
          c.output(callWebService(c.element().getValue()));
        }
      }))
      .apply("WriteResults", TextIO.write().to("./output/"));

  p.run().waitUntilFinish();
}

Есть ли способ группировать по партиям, не вводя "поддельные" ключи?

1 ответ

Решение

Требуется предоставить входы KV для GroupIntoBatches потому что преобразование реализовано с использованием состояния и таймеров, которые для каждого ключа и окна.

Для каждой пары "ключ + окно" состояние и таймеры обязательно выполняются последовательно (или, по-видимому, так). Вы должны вручную выразить доступный параллелизм, предоставив ключи (и окна, хотя ни один бегун, о котором я знаю, не распараллеливает на сегодня окна). Два наиболее распространенных подхода:

  1. Используйте какой-нибудь естественный ключ, такой как идентификатор пользователя
  2. Выберите некоторое фиксированное количество осколков и напишите случайным образом. Это может быть сложнее настроить. Вы должны иметь достаточно шардов, чтобы получить достаточный параллелизм, но каждый шард должен включать достаточно данных, которые GroupIntoBatches на самом деле полезно.

Добавление одного фиктивного ключа ко всем элементам, как в вашем фрагменте, приведет к тому, что преобразование вообще не будет выполняться параллельно. Это похоже на обсуждение индексации с отслеживанием состояния, при котором ParDo запускается однопоточным в Dataflow Runner.

Другие вопросы по тегам