Как удалить дубликаты в скользящем окне - Apache Beam
Я реализовал конвейер данных с несколькими неограниченными источниками и боковыми входами, соединил данные со скользящим окном (30 с и каждые 10 с) и выдал преобразованный вывод в тему Кафки. Проблема, с которой я столкнулся, заключается в том, что данные, полученные в первые 10 секунд окна, передаются 3 раза (то есть), запускаются каждый раз, когда начинается новое окно, пока не будет завершено первое окно. Как передать преобразованные данные только один раз или избежать дублирования?
Я использовал сброшенные панели, и это не имеет значения. Всякий раз, когда я пытаюсь установить поведение закрытия окна как FIRE_ALWAYS/FIRE_IF_NON_EMPTY, он выдает следующую ошибку.
Исключение в потоке "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: пустое PCollection, доступное как одноэлементное представление. Рассмотрите возможность установки с помощью Default для предоставления значения по умолчанию в org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:332) в org.apache.beam.runners.direct.DirectRunner $ DirectPipelineResult.waitUn. Java:302) в org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) в org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) в org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) в org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) в y.yyy.main(yyy.java:86) Причина: java.lang.IllegalArgumentException: пустая PCollection доступна как одноэлементное представление. Рассмотрите возможность установки с помощью Default, чтобы предоставить значение по умолчанию в org.apache.beam.sdk.transforms.View$SingletonCombineFn.identity(View.java:378) в org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.extractOutput(Combine. Java:481) в org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.extractOutput(Combine.java:429) в org.apache.beam.sdk.transforms.Combine$CombineFn.apply(Combine.java:387) в org.apache.beam.sdk.transforms.Combine$GroupedValues $1.processElement(Combine.java:2089)
data.apply("Transform", ParDo.of(
new DoFn<String, Row>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(
ProcessContext processContext,
final OutputReceiver<Row> emitter) {
String record = processContext.element();
final String[] parts = record.split(",");
emitter.output(Row.withSchema(sch).addValues(parts).build());
}
})).apply(
"window1",
Window
.<Row>into(
SlidingWindows
.of(Duration.standardSeconds(30))
.every(Duration.standardSeconds(10)))
.withAllowedLateness(
Duration.ZERO,
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.discardingFiredPanes());
Пожалуйста, помогите мне вызвать окно только один раз (то есть) я не хочу отправлять записи, которые уже обработаны
Обновление: вышеприведенная ошибка для бокового ввода часто возникает, и это не из-за окон, похоже на проблему в Apache Beam ( https://issues.apache.org/jira/browse/BEAM-6086)
Я попытался использовать State для определения, обработана ли строка или нет, но состояние не сохраняется или не устанавливается. (т.е.) я всегда получаю нулевое значение при чтении состояния.
public class CheckState extends DoFn<KV<String,String>,KV<Integer,String>> {
private static final long serialVersionUID = 1L;
@StateId("count")
private final StateSpec<ValueState<String>> countState =
StateSpecs.value(StringUtf8Coder.of());
@ProcessElement
public void processElement(
ProcessContext processContext,
@StateId("count") ValueState<String> countState) {
KV<String,String> record = processContext.element();
String row = record.getValue();
System.out.println("State: " + countState.read());
System.out.println("Setting state as "+ record.getKey() + " for value"+ row.split(",")[0]);
processContext.output(KV.of(current, row));
countState.write(record.getKey());
}
Спасибо,
1 ответ
Если я правильно понял проблему, это может быть связано с использованием раздвижных окон в конвейере:
Скользящее временное окно перекрывается, хорошее объяснение из направляющих луча Функции окна
"Поскольку несколько окон перекрываются, большинство элементов в наборе данных будут принадлежать более чем одному окну. Этот вид управления окнами полезен для получения скользящих средних данных;..."
Фиксированные окна, однако, не будут перекрываться:
"Фиксированное временное окно представляет постоянную длительность, непересекающийся временной интервал в потоке данных..."