Триггер окна Cloud Dataflow перезаписывает значение из закрытого окна
Я пишу поток данных (Beam SDK 2.0.0), который читает из Pub/Sub, подсчитывает элементы в окне, а затем сохраняет их в BigTable в качестве временных рядов. Окна фиксируются на продолжительности 1 минуты.
Мое намерение состояло в том, чтобы обновлять значение текущего окна каждую секунду, используя триггер, чтобы получать обновления в реальном времени о текущем временном окне.
Но это не похоже на работу. Значение корректно обновляется каждую секунду, но как только поток данных начинает работать в следующую минуту, первый обновляется до нуля. Так что в основном только мое последнее значение верно, все остальное - ноль.
Pipeline pipeline = Pipeline.create(options);
PCollection<String> live = pipeline
.apply("Read from PubSub", PubsubIO.readStrings()
.fromSubscription("projects/..."))
.apply("Window per minute",
Window
.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(1)))
.orFinally(AfterWatermark.pastEndOfWindow()))
.accumulatingFiredPanes()
.withAllowedLateness(Duration.ZERO)
);
Я пытался играть с кодом триггера, но ничего не помогает. Мои единственные варианты прямо сейчас, чтобы удалить весь .trigger
блок. Кто-нибудь испытывал подобное поведение?
1 ответ
После сообщения о моей проблеме в Google, они обнаружили некоторые проблемы в Beam SDK, которые вызывают это. Подробнее по этим ссылкам:
Когда таймеры EOW и GC срабатывают вместе (допускается ненулевое опоздание), мы не замечаем, что это последняя панель: https://issues.apache.org/jira/browse/BEAM-2505
Таймеры времени обработки не игнорируются должным образом, если они входят с таймером ГХ: https://issues.apache.org/jira/browse/BEAM-2502
Таймеры обработки просто интерпретируются как таймеры GC, совершенно неправильно сравнивая временные метки из разных временных доменов: https://issues.apache.org/jira/browse/BEAM-2504