Flink: как сохранить и восстановить ValueState
Я использую Flink, чтобы обогатить поток входов
case class Input( key: String, message: String )
с предварительно вычисленными баллами
case class Score( key: String, score: Int )
и произвести вывод
case class Output( key: String, message: String, score: Int )
Оба потока ввода и оценки считываются из тем Kafka, а полученный поток вывода также публикуется в Kafka
val processed = inputStream.connect( scoreStream )
.flatMap( new ScoreEnrichmentFunction )
.addSink( producer )
со следующей функцией ScoreEnrichmentFunction:
class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output]
{
val scoreStateDescriptor = new ValueStateDescriptor[Score]( "saved scores", classOf[Score] )
lazy val scoreState: ValueState[Score] = getRuntimeContext.getState( scoreStateDescriptor )
override def flatMap1( input: Input, out: Collector[Output] ): Unit =
{
Option( scoreState.value ) match {
case None => out.collect( Output( input.key, input.message, -1 ) )
case Some( score ) => out.collect( Output( input.key, input.message, score.score ) )
}
}
override def flatMap2( score: Score, out: Collector[Output] ): Unit =
{
scoreState.update( score )
}
}
Это хорошо работает. Однако, если я возьму безопасную точку и отменим задание Flink, результаты, сохраненные в ValueState, будут потеряны, когда я продолжу работу с точки сохранения.
Как я понимаю, похоже, что ScoreEnrichmentFunction необходимо расширить с помощью CheckPointedFunction
class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output] with CheckpointedFunction
но я изо всех сил пытаюсь понять, как реализовать методы snapshotState и initializeState для работы с состоянием ключа
override def snapshotState( context: FunctionSnapshotContext ): Unit = ???
override def initializeState( context: FunctionInitializationContext ): Unit = ???
Обратите внимание, что я использую следующий env:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism( 2 )
env.setBufferTimeout( 1 )
env.enableCheckpointing( 1000 )
env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION )
env.getCheckpointConfig.setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE )
env.getCheckpointConfig.setMinPauseBetweenCheckpoints( 500 )
env.getCheckpointConfig.setCheckpointTimeout( 60000 )
env.getCheckpointConfig.setFailOnCheckpointingErrors( false )
env.getCheckpointConfig.setMaxConcurrentCheckpoints( 1 )
1 ответ
Я думаю, что нашел проблему. Я пытался использовать отдельные каталоги для контрольных точек и точек сохранения, в результате чего каталог точек сохранения и каталог FsStateBackend были разными.
Используя тот же каталог в
val backend = new FsStateBackend( "file:/data", true )
env.setStateBackend( backend )
и при получении точки сохранения
bin/flink cancel d75f4712346cadb4df90ec06ef257636 -s file:/data
решает проблему.