Триггер окна Cloud Dataflow перезаписывает значение из закрытого окна

Я пишу поток данных (Beam SDK 2.0.0), который читает из Pub/Sub, подсчитывает элементы в окне, а затем сохраняет их в BigTable в качестве временных рядов. Окна фиксируются на продолжительности 1 минуты.

Мое намерение состояло в том, чтобы обновлять значение текущего окна каждую секунду, используя триггер, чтобы получать обновления в реальном времени о текущем временном окне.

Но это не похоже на работу. Значение корректно обновляется каждую секунду, но как только поток данных начинает работать в следующую минуту, первый обновляется до нуля. Так что в основном только мое последнее значение верно, все остальное - ноль.

Pipeline pipeline = Pipeline.create(options);

PCollection<String> live = pipeline
        .apply("Read from PubSub", PubsubIO.readStrings()
        .fromSubscription("projects/..."))
        .apply("Window per minute",
            Window
                .<String>into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(Repeatedly
                    .forever(AfterProcessingTime
                        .pastFirstElementInPane()
                        .plusDelayOf(Duration.standardSeconds(1)))                                         
                    .orFinally(AfterWatermark.pastEndOfWindow()))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.ZERO)
            );

Я пытался играть с кодом триггера, но ничего не помогает. Мои единственные варианты прямо сейчас, чтобы удалить весь .trigger блок. Кто-нибудь испытывал подобное поведение?

1 ответ

Решение

После сообщения о моей проблеме в Google, они обнаружили некоторые проблемы в Beam SDK, которые вызывают это. Подробнее по этим ссылкам:

Когда таймеры EOW и GC срабатывают вместе (допускается ненулевое опоздание), мы не замечаем, что это последняя панель: https://issues.apache.org/jira/browse/BEAM-2505

Таймеры времени обработки не игнорируются должным образом, если они входят с таймером ГХ: https://issues.apache.org/jira/browse/BEAM-2502

Таймеры обработки просто интерпретируются как таймеры GC, совершенно неправильно сравнивая временные метки из разных временных доменов: https://issues.apache.org/jira/browse/BEAM-2504

Другие вопросы по тегам