Как выполнить контрольную точку в Apache Beam при использовании flink runner?
Я читаю из несвязанного источника (Kafka) и пишу его количество слов в другую тему Kafka. Теперь я хочу выполнить контрольную точку в Beam Pipeline. Я выполнил все инструкции в документации apache beam, но каталог контрольных точек не создается даже после этого.
Ниже приведены параметры, которые я использовал для конвейера:-
--runner=FlinkRunner
--streaming=true
--parallelism=2
--checkpointingInterval=1000
--checkpointTimeoutMillis=5000
--minPauseBetweenCheckpoints=500
--externalizedCheckpointsEnabled=true
--retainExternalizedCheckpointsOnCancellation=true
может кто-нибудь помочь мне с установкой контрольных точек?
2 ответа
Я работал над решением, так что вы можете изменить путь checkpoint.state.dir в flink-conf.yaml кластера ссылок, а другой - с помощью flinkPipelineOptions-
@Description(
"Sets the state backend factory to use in streaming mode. "
+ "Defaults to the flink cluster's state.backend configuration.")
Class<? extends FlinkStateBackendFactory> getStateBackendFactory();
void setStateBackendFactory(Class<? extends FlinkStateBackendFactory> stateBackendFactory);
и установив setStateBackendFactory (я использовал собственный класс)
static class bakend implements FlinkStateBackendFactory{
@Override
public StateBackend createStateBackend(FlinkPipelineOptions options) {
return new FsStateBackend("file:///Users/myPc/word-count-beam/src/checkpoint/");
}
}
это создаст checkpointDir, вам также необходимо установить значение checkpointinginterval для включения контрольной точки.
Я знаю, что это старо, но хочу согласиться с вашим ответом. в 2019 году мы создали докеризованный флинк, а также луч и работу с этими опциями.
--runner=FlinkRunner --streaming=true --checkpointingInterval=30000 --env=dev
и мы настроили в conf.yml с rockdb в качестве серверной части.