Как выполнить контрольную точку в Apache Beam при использовании flink runner?

Я читаю из несвязанного источника (Kafka) и пишу его количество слов в другую тему Kafka. Теперь я хочу выполнить контрольную точку в Beam Pipeline. Я выполнил все инструкции в документации apache beam, но каталог контрольных точек не создается даже после этого.

Ниже приведены параметры, которые я использовал для конвейера:-

--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true

может кто-нибудь помочь мне с установкой контрольных точек?

2 ответа

Я работал над решением, так что вы можете изменить путь checkpoint.state.dir в flink-conf.yaml кластера ссылок, а другой - с помощью flinkPipelineOptions-

        @Description(
                "Sets the state backend factory to use in streaming mode. "
                        + "Defaults to the flink cluster's state.backend configuration.")
        Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
        void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);

и установив setStateBackendFactory (я использовал собственный класс)

  static class  bakend implements FlinkStateBackendFactory{

        @Override
        public StateBackend createStateBackend(FlinkPipelineOptions options) {
            return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");

        }
    }

это создаст checkpointDir, вам также необходимо установить значение checkpointinginterval для включения контрольной точки.

Я знаю, что это старо, но хочу согласиться с вашим ответом. в 2019 году мы создали докеризованный флинк, а также луч и работу с этими опциями.

      --runner=FlinkRunner --streaming=true --checkpointingInterval=30000 --env=dev

и мы настроили в conf.yml с rockdb в качестве серверной части.