Подтверждение сообщения Google Pub/Sub на Apache Beam

Я пытаюсь прочитать из pub/sub с помощью следующего кода

Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
    @Override
    public String apply(PubsubMessage input) {
        LOG.info("hola " + input.getAttributeMap());
        return new String(input.getMessage());
    }
});
PCollection<String> pps = p.apply(pubsub)
        .apply(
                Window.<String>into(
                    FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
    @ProcessElement
    public void processElement(ProcessContext c) {
        LOG.info("hola amigo "+c.element());
        c.output(c.element());
    }
  }));

По сравнению с тем, что я получаю на NodeJS, я получаю сообщение, которое будет содержаться в data поле. Как я могу получить ackId поле (которое я могу позже использовать для подтверждения сообщения)? Карта атрибутов, которую я печатаю, null, Есть ли какой-нибудь другой способ подтвердить все сообщения, не выясняя ackId?

1 ответ

Решение

PubsubIO читатель несет ответственность за подтверждение сообщений. Это связано с контрольным поведением бегуна. В частности, источник будет подтверждать сообщения только тогда, когда результирующие элементы были отмечены контрольными точками.

В этом случае вы должны смотреть, когда контрольные точки бегущего Flink сообщают о состоянии этого источника. Я считаю, что это связано с конфигурацией Flink для частоты контрольных точек.

Другие вопросы по тегам