Как создать пользовательский Combine.PerKey в луче SDK 2.0

Мы выяснили, как создать пользовательскую функцию объединения (после большого количества догадок и чтения кода луча SDK 2.0) в луче SDK 2.0, так как синтаксис потока данных SDK 1.x не работал в SDK 2.0.

Тем не менее, мы не можем понять, как создать собственную функцию PER KEY в Beam SDK 2.0. Любая помощь или указатели (или лучше еще фактический пример) будет принята с благодарностью. (Мы искали в Интернете документацию или примеры и не нашли ни одного; мы также попытались взглянуть на код в классе Combine луча SDK 2.0, но не смогли его выяснить, тем более что класс PerKey теперь имеет частный конструктор, поэтому мы не можем продлить это больше.)

В случае, если это поможет, вот как мы правильно создали пользовательские ключи комбинирования (без) в луче SDK 2.0, но мы не можем понять, как создать ключ с ключом:

public class CombineTemplateIntervalsIntoBlocks
        extends Combine.AccumulatingCombineFn<ImmutableMySetOfIntervals, TemplateIntervalAccum, ArrayList<ImmutableMySetOfIntervals>>{


    public CombineTemplateIntervalsIntoBlocks() {
    }

    @Override
    public TemplateIntervalAccum createAccumulator() {
        return new TemplateIntervalAccum()
    }

а потом

public class TemplateIntervalAccum
        implements Combine.AccumulatingCombineFn.Accumulator<ImmutableMySetOfIntervals, TemplateIntervalAccum, ArrayList<ImmutableMySetOfIntervals>>, Serializable {
...

1 ответ

Решение

Вам не нужно создавать свой CombineFn иначе, чтобы использовать Combine.PerKey.

Вы можете продлить либо AccumulatingCombineFn (который помещает логику слияния в аккумулятор) или расширить CombineFn (который ставит логику слияния в CombineFn). Есть и другие варианты, такие как BinaryCombineFn а также IterableCombineFn,

Скажи, что у тебя есть CombineFn<InputT, AccumT, OutputT> называется combineFn:

  • Ты можешь использовать Combine.globally(combineFn) создать PTransform это занимает PCollection<InputT> и объединяет все элементы.
  • Или вы можете использовать Combine.perKey(combineFn) создать PTransform это занимает PCollection<KV<K, InputT>> и объединяет все значения, связанные с каждым ключом, и объединяет их. Это соответствует Combine.PerKey Я полагаю, что вы имеете в виду.
Другие вопросы по тегам