Получение сообщений из Google Pubsub и публикация их в Kafka
Я пытаюсь использовать сообщения Google PubSub с помощью синхронного PULL API. Это доступно в библиотеке соединителей ввода-вывода Google PubSub для Apache Beam. Я хочу записывать использованные сообщения в Kafka с помощью KafkaIO. Я хочу использовать FlinkRunner для выполнения задания, поскольку мы запускаем это приложение вне GCP.
Проблема, с которой я столкнулся, заключается в том, что использованные сообщения не получают ACK в GCP PubSub. Я подтвердил, что локальный экземпляр Kafka использует сообщения из GCP PubSub. Документация в GCP DataFlow указывает, что пакет данных завершается, когда конвейер завершается приемником данных, которым в моем случае является Kafka.
Но поскольку код выполняется в Apache Flink, а не в GCP DataFlow, я думаю, что какой-то обратный вызов не запускается, связанный с ACK-подтверждением зафиксированного сообщения.
Что я здесь делаю не так?
pipeline
.apply("Read GCP PubSub Messages", PubsubIO.readStrings()
.fromSubscription(subscription)
)
.apply(ParseJsons.of(User.class))
.setCoder(SerializableCoder.of(User.class))
.apply("Filter-1", ParDo.of(new FilterTextFn()))
.apply(AsJsons.of(User.class).withMapper(new ObjectMapper()))
.apply("Write to Local Kafka",
KafkaIO.<Void,String>write()
.withBootstrapServers("127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094")
.withTopic("test-topic")
.withValueSerializer((StringSerializer.class))
.values()
);
2 ответа
Я исправил это решение, воспользовавшись предложением Гийома Блакьера ( guillaume blaquiere) посмотреть на контрольные точки. Даже после добавления функции Window.into() в конвейер исходная конечная точка подписки PubSub не получала ACK.
Проблема была в конфигурации сервера Flink, я не упомянул конфигурацию контрольной точки. Без этих параметров контрольные точки отключены.
state.backend: rocksdb
state.checkpoints.dir: file:///tmp/flink-1.9.3/state/checkpoints/
Эти конфигурации должны находиться в файле flink_home/conf/flink-conf.yaml. После добавления этих записей и перезапуска флинк. Все незакрепленные (незапакованные сообщения) перешли на 0 в диаграмме мониторинга pubsub GCP.
В документации Beam по классу PubSub IO упоминается следующее:
Контрольные точки используются как для подтверждения полученных сообщений обратно в Pubsub (чтобы их можно было удалить на стороне Pubsub), так и для NACK уже использованных сообщений, если контрольную точку необходимо восстановить (чтобы Pubsub отправлял эти сообщения повторно).
ACK не связаны с потоком данных, вы должны иметь такое же поведение с потоком данных. Подтверждения отправляются на контрольно-пропускные пункты. Обычно контрольные точки - это окна, которые вы устанавливаете в потоке вашего потока.
Но вы не установили окно! По умолчанию окна являются глобальными и закрываются только в конце, если вы изящно останавливаете свою работу (и даже, я не уверен в этом). В любом случае, лучшее решение - иметь фиксированные окна (например, 5 минут) для подтверждения сообщений в каждом из этих окон.