Как удалить дубликаты в скользящем окне - Apache Beam

Я реализовал конвейер данных с несколькими неограниченными источниками и боковыми входами, соединил данные со скользящим окном (30 с и каждые 10 с) и выдал преобразованный вывод в тему Кафки. Проблема, с которой я столкнулся, заключается в том, что данные, полученные в первые 10 секунд окна, передаются 3 раза (то есть), запускаются каждый раз, когда начинается новое окно, пока не будет завершено первое окно. Как передать преобразованные данные только один раз или избежать дублирования?

Я использовал сброшенные панели, и это не имеет значения. Всякий раз, когда я пытаюсь установить поведение закрытия окна как FIRE_ALWAYS/FIRE_IF_NON_EMPTY, он выдает следующую ошибку.

Исключение в потоке "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException: пустое PCollection, доступное как одноэлементное представление. Рассмотрите возможность установки с помощью Default для предоставления значения по умолчанию в org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:332) в org.apache.beam.runners.direct.DirectRunner $ DirectPipelineResult.waitUn. Java:302) в org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:197) в org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:64) в org.apache.beam.sdk.Pipeline.run(Pipeline.java:313) в org.apache.beam.sdk.Pipeline.run(Pipeline.java:299) в y.yyy.main(yyy.java:86) Причина: java.lang.IllegalArgumentException: пустая PCollection доступна как одноэлементное представление. Рассмотрите возможность установки с помощью Default, чтобы предоставить значение по умолчанию в org.apache.beam.sdk.transforms.View$SingletonCombineFn.identity(View.java:378) в org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.extractOutput(Combine. Java:481) в org.apache.beam.sdk.transforms.Combine$BinaryCombineFn.extractOutput(Combine.java:429) в org.apache.beam.sdk.transforms.Combine$CombineFn.apply(Combine.java:387) в org.apache.beam.sdk.transforms.Combine$GroupedValues ​​$1.processElement(Combine.java:2089)

data.apply("Transform", ParDo.of(
  new DoFn<String, Row>() {

    private static final long serialVersionUID = 1L;

    @ProcessElement
    public void processElement(
      ProcessContext processContext,
      final OutputReceiver<Row> emitter) {

        String record = processContext.element();
        final String[] parts = record.split(",");
        emitter.output(Row.withSchema(sch).addValues(parts).build());
    }
  })).apply(
    "window1",
    Window
      .<Row>into(
        SlidingWindows
          .of(Duration.standardSeconds(30))
          .every(Duration.standardSeconds(10)))
      .withAllowedLateness(
        Duration.ZERO,
        Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
  .discardingFiredPanes());

Пожалуйста, помогите мне вызвать окно только один раз (то есть) я не хочу отправлять записи, которые уже обработаны

Обновление: вышеприведенная ошибка для бокового ввода часто возникает, и это не из-за окон, похоже на проблему в Apache Beam ( https://issues.apache.org/jira/browse/BEAM-6086)

Я попытался использовать State для определения, обработана ли строка или нет, но состояние не сохраняется или не устанавливается. (т.е.) я всегда получаю нулевое значение при чтении состояния.

public class CheckState extends DoFn<KV<String,String>,KV<Integer,String>> {
  private static final long serialVersionUID = 1L;

  @StateId("count")
  private final StateSpec<ValueState<String>> countState =
                     StateSpecs.value(StringUtf8Coder.of());

  @ProcessElement
  public void processElement(
    ProcessContext processContext,
    @StateId("count") ValueState<String> countState) {

        KV<String,String> record = processContext.element();
        String row = record.getValue();
        System.out.println("State: " + countState.read());
        System.out.println("Setting state as "+ record.getKey() + " for value"+ row.split(",")[0]);
        processContext.output(KV.of(current, row));
        countState.write(record.getKey());
    }

Спасибо,

1 ответ

Если я правильно понял проблему, это может быть связано с использованием раздвижных окон в конвейере:

Скользящее временное окно перекрывается, хорошее объяснение из направляющих луча Функции окна

"Поскольку несколько окон перекрываются, большинство элементов в наборе данных будут принадлежать более чем одному окну. Этот вид управления окнами полезен для получения скользящих средних данных;..."

Фиксированные окна, однако, не будут перекрываться:

"Фиксированное временное окно представляет постоянную длительность, непересекающийся временной интервал в потоке данных..."

Другие вопросы по тегам