Подтверждение сообщения Google Pub/Sub на Apache Beam
Я пытаюсь прочитать из pub/sub с помощью следующего кода
Read<String> pubsub = PubsubIO.<String>read().topic("projects/<projectId>/topics/<topic>").subscription("projects/<projectId>/subscriptions/<subscription>").withCoder(StringUtf8Coder.of()).withAttributes(new SimpleFunction<PubsubMessage,String>() {
@Override
public String apply(PubsubMessage input) {
LOG.info("hola " + input.getAttributeMap());
return new String(input.getMessage());
}
});
PCollection<String> pps = p.apply(pubsub)
.apply(
Window.<String>into(
FixedWindows.of(Duration.standardSeconds(15))));
pps.apply("printdata",ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
LOG.info("hola amigo "+c.element());
c.output(c.element());
}
}));
По сравнению с тем, что я получаю на NodeJS, я получаю сообщение, которое будет содержаться в data
поле. Как я могу получить ackId
поле (которое я могу позже использовать для подтверждения сообщения)? Карта атрибутов, которую я печатаю, null
, Есть ли какой-нибудь другой способ подтвердить все сообщения, не выясняя ackId?
1 ответ
PubsubIO
читатель несет ответственность за подтверждение сообщений. Это связано с контрольным поведением бегуна. В частности, источник будет подтверждать сообщения только тогда, когда результирующие элементы были отмечены контрольными точками.
В этом случае вы должны смотреть, когда контрольные точки бегущего Flink сообщают о состоянии этого источника. Я считаю, что это связано с конфигурацией Flink для частоты контрольных точек.