Flink: как сохранить и восстановить ValueState

Я использую Flink, чтобы обогатить поток входов

case class Input( key: String, message: String )

с предварительно вычисленными баллами

case class Score( key: String, score: Int )

и произвести вывод

case class Output( key: String, message: String, score: Int )

Оба потока ввода и оценки считываются из тем Kafka, а полученный поток вывода также публикуется в Kafka

val processed = inputStream.connect( scoreStream )
                           .flatMap( new ScoreEnrichmentFunction )
                           .addSink( producer )

со следующей функцией ScoreEnrichmentFunction:

class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output]
{
    val scoreStateDescriptor = new ValueStateDescriptor[Score]( "saved scores", classOf[Score] )
    lazy val scoreState: ValueState[Score] = getRuntimeContext.getState( scoreStateDescriptor )

    override def flatMap1( input: Input, out: Collector[Output] ): Unit = 
    {
        Option( scoreState.value ) match {
            case None => out.collect( Output( input.key, input.message, -1 ) )
            case Some( score ) => out.collect( Output( input.key, input.message, score.score ) )  
        }
    }

    override def flatMap2( score: Score, out: Collector[Output] ): Unit = 
    {
        scoreState.update( score )
    } 
}

Это хорошо работает. Однако, если я возьму безопасную точку и отменим задание Flink, результаты, сохраненные в ValueState, будут потеряны, когда я продолжу работу с точки сохранения.

Как я понимаю, похоже, что ScoreEnrichmentFunction необходимо расширить с помощью CheckPointedFunction

class ScoreEnrichmentFunction extends RichCoFlatMapFunction[Input, Score, Output] with CheckpointedFunction

но я изо всех сил пытаюсь понять, как реализовать методы snapshotState и initializeState для работы с состоянием ключа

override def snapshotState( context: FunctionSnapshotContext ): Unit = ???


override def initializeState( context: FunctionInitializationContext ): Unit = ???

Обратите внимание, что я использую следующий env:

val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism( 2 )
    env.setBufferTimeout( 1 )
    env.enableCheckpointing( 1000 )
    env.getCheckpointConfig.enableExternalizedCheckpoints( ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION )
    env.getCheckpointConfig.setCheckpointingMode( CheckpointingMode.EXACTLY_ONCE )
    env.getCheckpointConfig.setMinPauseBetweenCheckpoints( 500 )
    env.getCheckpointConfig.setCheckpointTimeout( 60000 )
    env.getCheckpointConfig.setFailOnCheckpointingErrors( false )
    env.getCheckpointConfig.setMaxConcurrentCheckpoints( 1 )

1 ответ

Я думаю, что нашел проблему. Я пытался использовать отдельные каталоги для контрольных точек и точек сохранения, в результате чего каталог точек сохранения и каталог FsStateBackend были разными.

Используя тот же каталог в

val backend = new FsStateBackend( "file:/data", true )
env.setStateBackend( backend )

и при получении точки сохранения

bin/flink cancel d75f4712346cadb4df90ec06ef257636 -s file:/data

решает проблему.

Другие вопросы по тегам