Потоковые группы мутации в гаечный ключ
Я пытаюсь направить MutationGroups в гаечный ключ с SpannerIO. Цель состоит в том, чтобы писать новые MuationGroups каждые 10 секунд, так как мы будем использовать гаечный ключ для запроса KPI ближайшего времени.
Когда я не использую никаких окон, я получаю следующую ошибку:
Exception in thread "main" java.lang.IllegalStateException: GroupByKey cannot be applied to non-bounded PCollection in the GlobalWindow without a trigger. Use a Window.into or Window.triggering transform prior to GroupByKey.
at org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173)
at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204)
at org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:120)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1585)
at org.apache.beam.sdk.transforms.Combine$PerKey.expand(Combine.java:1470)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:868)
at org.apache.beam.sdk.io.gcp.spanner.SpannerIO$WriteGrouped.expand(SpannerIO.java:823)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)
at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:52)
at quantum.base.transform.entity.spanner.SpannerProtoWrite.expand(SpannerProtoWrite.java:20)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:388)
at quantum.entitybuilder.pipeline.EntityBuilderPipeline$Write$SpannerWrite.expand(EntityBuilderPipeline.java:372)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:491)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:299)
at quantum.entitybuilder.pipeline.EntityBuilderPipeline.main(EntityBuilderPipeline.java:122)
:entityBuilder FAILED
Из-за вышеуказанной ошибки я предполагаю, что входная коллекция должна быть оконной и запущенной, так как SpannerIO использует GroupByKey (это также то, что мне нужно для моего варианта использования):
...
.apply("1-minute windows", Window.<MutationGroup>into(FixedWindows.of(Duration.standardMinutes(1)))
.triggering(Repeatedly.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(10))
).orFinally(AfterWatermark.pastEndOfWindow()))
.discardingFiredPanes()
.withAllowedLateness(Duration.ZERO))
.apply(SpannerIO.write()
.withProjectId(entityConfig.getSpannerProject())
.withInstanceId(entityConfig.getSpannerInstance())
.withDatabaseId(entityConfig.getSpannerDb())
.grouped());
Когда я делаю это, я получаю следующие исключения во время выполнения:
java.lang.IllegalArgumentException: Attempted to get side input window for GlobalWindow from non-global WindowFn
org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn$1.getSideInputWindow(PartitioningWindowFn.java:49)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$StepContext.issueSideInputFetch(StreamingModeExecutionContext.java:631)
com.google.cloud.dataflow.worker.StreamingModeExecutionContext$UserStepContext.issueSideInputFetch(StreamingModeExecutionContext.java:683)
com.google.cloud.dataflow.worker.StreamingSideInputFetcher.storeIfBlocked(StreamingSideInputFetcher.java:182)
com.google.cloud.dataflow.worker.StreamingSideInputDoFnRunner.processElement(StreamingSideInputDoFnRunner.java:71)
com.google.cloud.dataflow.worker.SimpleParDoFn.processElement(SimpleParDoFn.java:323)
com.google.cloud.dataflow.worker.util.common.worker.ParDoOperation.process(ParDoOperation.java:43)
com.google.cloud.dataflow.worker.util.common.worker.OutputReceiver.process(OutputReceiver.java:48)
com.google.cloud.dataflow.worker.SimpleParDoFn$1.output(SimpleParDoFn.java:271)
org.apache.beam.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:219)
org.apache.beam.runners.core.SimpleDoFnRunner.access$700(SimpleDoFnRunner.java:69)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:517)
org.apache.beam.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:505)
org.apache.beam.sdk.values.ValueWithRecordId$StripIdsDoFn.processElement(ValueWithRecordId.java:145)
После дальнейшего расследования, похоже, это связано с .apply(Wait.on(input))
в SpannerIO: он имеет глобальный боковой ввод, который, кажется, не работает с моими фиксированными окнами, как документы Wait.java
государство:
If signal is globally windowed, main input must also be. This typically would be useful
* only in a batch pipeline, because the global window of an infinite PCollection never
* closes, so the wait signal will never be ready.
В качестве временного решения я попробовал следующее:
добавить GlobalWindow с триггерами вместо фиксированных окон:
.apply("globalwindow", Window.<MutationGroup>into(new GlobalWindows()) .triggering(Repeatedly.forever(AfterProcessingTime .pastFirstElementInPane() .plusDelayOf(Duration.standardSeconds(10)) ).orFinally(AfterWatermark.pastEndOfWindow())) .discardingFiredPanes() .withAllowedLateness(Duration.ZERO))
Это приводит к записи в гаечный ключ, только когда я истощаю свой конвейер. У меня сложилось впечатление
Wait.on()
Сигнал срабатывает только при закрытии глобальных окон и не работает с триггерами.Отключить
.apply(Wait.on(input))
в SpannerIO:Это приводит к тому, что конвейер застревает при создании представления, которое описано в этом сообщении SO: SpannerIO Dataflow 2.3.0 застрял в CreateDataflowView.
Когда я проверяю рабочие журналы на наличие подсказок, я получаю следующие предупреждения:
logger: "org.apache.beam.sdk.coders.SerializableCoder" message: "Can't verify serialized elements of type SpannerSchema have well defined equals method. This may produce incorrect results on some PipelineRunner logger: "org.apache.beam.sdk.coders.SerializableCoder" message: "Can't verify serialized elements of type BoundedSource have well defined equals method. This may produce incorrect results on some PipelineRunner"
Обратите внимание, что все работает с DirectRunner и я пытаюсь использовать DataflowRunner.
У кого-нибудь есть какие-либо предложения по поводу того, что я могу попробовать запустить? Я с трудом могу себе представить, что я единственный, кто пытается направить MutationGroups в гаечный ключ.
Заранее спасибо!
1 ответ
В настоящее время разъем SpannerIO не поддерживается потоковой передачей луча. Пожалуйста, следуйте этому запросу на извлечение, который добавляет потоковую поддержку для разъема ввода-вывода гаечного ключа.