Как исправить "Объединение неограниченных PCollections в настоящее время поддерживается только для неглобальных окон с триггерами" в Apache Beam

Я пытаюсь объединить 2 неограниченных источника, используя Apache Beam Java SDK. При присоединении я получаю сообщение об ошибке ниже.

Исключение в потоке "main" java.lang.UnsupportedOperationException: Присоединение неограниченных PCollections в настоящее время поддерживается только для неглобальных окон с триггерами, которые, как известно, производят вывод один раз за окно, например триггер по умолчанию с нулевым допустимым запаздыванием. В этих случаях Beam может гарантировать, что он объединяет все входные элементы один раз за окно. WindowingStrategy]windowFn=org.apache.beam.sdk.transforms.windowing.SlidingWindows@1b87117, allowLateness=PT0S, trigger=Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane(). } не поддерживается в org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel.verifySupportedTrigger(BeamJoinRel.java:341) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel. получить доступ к $1500(BeamJoinRel.java:98) по адресу org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel$StandardJoin.expand(BeamJoinRel.java:330) по адресу org.apache.beam.sdk.extensions.sql.impl.rel.BeamJoinRel$StandardJoin.expand(BeamJoinRel.java:308) в org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) в org.apache.beam.sdk.Pipeline.applyTransform(конвейер.java:488) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:67) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlUtil & lambda$ Буй ldPCollectionList$0(BeamSqlRelUtils.java:48) в java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) в java.util.Iterator.forEachRemaining(Iterator.java:116) в java.util.Spiterators IteratorSpliterator.forEachRemaining(Spliterators.java:1801) в java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481) в java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471) в java.471 at..ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) в java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) в java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499) в организации.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:49) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPljlUU) в org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36) в org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:100) в org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:76) в org.apache.beam.sdk.Pipeline.applyInternal (Pipeline.java: 537) в org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488) в org.apache.beam.sdk.values.PCollectionTuple.apply(PCollectionTuple.java:167) в xyz.xyz.main(xyz.java:64)

Я попытался использовать как фиксированное и скользящее окно вместе с триггером (pastEndOfWindow & pastFirstElementInPane) с нулевым допустимым опозданием. Опробовал Accumalate & Discard. Я получаю одно и то же сообщение об ошибке каждый раз.

Ниже приведены 2 фрагмента, которые я пробовал использовать как фиксированное, так и скользящее окно.

p1.apply("window",
    Window
      .<Row>into(FixedWindows.of(Duration.standardSeconds(50)))
      .triggering(AfterWatermark.pastEndOfWindow())
      .withAllowedLateness(Duration.ZERO)
      .accumulatingFiredPanes());
p1.apply("window2",
    Window.<Row>into(
        SlidingWindows
          .of(Duration.standardSeconds(30))
          .every(Duration.standardSeconds(5)))
      .triggering(
        Repeatedly
          .forever(
             AfterProcessingTime
               .pastFirstElementInPane()
               .plusDelayOf(Duration.standardMinutes(1))))
      .withAllowedLateness(Duration.ZERO)
      .discardingFiredPanes());

Я просто хотел реализовать преобразование sql со скользящим окном, триггером с задержкой и разрешением задержки. Пожалуйста, проведите меня до конца.

Спасибо гаутам

2 ответа

Решение

Из комментария, если я правильно понимаю, желаемое поведение:

  • объединить два потока;
  • генерировать результаты каждые 30 секунд в реальном времени;
  • если данные не могут быть сопоставлены, подождите соответствующую соответствующую запись в течение 30 минут максимум;
  • сбросить записи через 30 минут;

По сути, это своего рода непрерывное скользящее сопоставление последних 30 минут данных в обоих потоках, и результаты выдаются каждые 30 секунд.

Хорошей новостью является то, что должна быть возможность реализовать в Beam Java (возможно, и в Python). Плохая новость - это, вероятно, было бы нетривиально в Java, и я не думаю, что сейчас это вообще возможно в SQL.

Как это могло бы выглядеть:

  • ввод должен быть в глобальном окне;
  • иметь состояние ParDo (или это), который отслеживает все видимые элементы, сохраняя их в ячейке состояния:
    • вам, вероятно, потребуется использовать боковой ввод или применить CoGroupByKey заранее иметь доступ к элементам с обоих входов в одном ParDo;
    • боковые входы и CoGroupByKey имеют различную семантику и с ней может быть нелегко работать;
  • на каждом входе вручную проверяйте состояние для соответствующих записей;
  • либо сразу отправлять результаты, либо хранить их в другой ячейке состояния;
  • есть таймер, который будет очищать старые несопоставленные записи:
    • вам может понадобиться вручную отслеживать метки времени и другие вещи;
  • применить желаемое окно / триггер к выходу при необходимости;

Я предлагаю вам прочитать этот пример, он выполняет таймер и часть состояния того, что вам нужно (он ожидает сопоставления записей, сохраняет несопоставленные записи в состоянии и очищает состояние при срабатывании таймера) и использует CoGroupByKey, Возможно, вы поймете, как это работает, после того, как поймете этот пример.

До сих пор (2.13.0) BeamSQL не поддерживал неограниченное объединение неограниченных PCollections с триггерами не по умолчанию. Для таких объединений разрешен только триггер по умолчанию (поэтому в каждом окне будет только один результат).

Основная причина заключается в том, что в текущей реализации Beam Java SDK отсутствует механизм (который называется втягиванием и накоплением) для уточнения данных в таких случаях, как Join.

Другие вопросы по тегам