Flink: как обрабатывать изменения конфигурации внешнего приложения в Flink

Мое требование - потоковая передача миллионов записей в день, и это сильно зависит от параметров внешней конфигурации. Например, пользователь может в любой момент изменить требуемый параметр в веб-приложении, и после внесения изменений должна произойти потоковая передача с параметрами конфигурации нового приложения. Это конфигурации на уровне приложения, и у нас также есть некоторые динамические параметры исключения, через которые должны быть переданы и отфильтрованы все данные.

Я вижу, что у flink нет глобального состояния, которое является общим для всех менеджеров задач и подзадач. Наличие централизованного кэша - вариант, но для каждого параметра мне придется читать его из кэша, что увеличит задержку. Посоветуйте, пожалуйста, лучший подход для обработки таких сценариев и как другие приложения справляются с этим. Благодарю.

1 ответ

Решение

Обновление конфигурации работающего потокового приложения является распространенным требованием. В Flink DataStream API это можно сделать с помощью так называемого CoFlatMapFunction который обрабатывает два входных потока. Один из потоков может быть потоком данных, а другой - потоком управления.

В следующем примере показано, как динамически адаптировать пользовательскую функцию, которая отфильтровывает строки, длина которых превышает определенную.

val data: DataStream[String] = ???
val control: DataStream[Int] = ???

val filtered: DataStream[String] = data
  // broadcast all control messages to the following CoFlatMap subtasks
  .connect(control.broadcast)
  // process data and control messages
  .flatMap(new DynLengthFilter)


class DynLengthFilter extends CoFlatMapFunction[String, Int, String] with Checkpointed[Int] {

  var length = 0

  // filter strings by length
  override def flatMap1(value: String, out: Collector[String]): Unit = {
    if (value.length < length) {
      out.collect(value)
    }
  }

  // receive new filter length
  override def flatMap2(value: Int, out: Collector[String]): Unit = {
    length = value
  }

  override def snapshotState(checkpointId: Long, checkpointTimestamp: Long): Int = length

  override def restoreState(state: Int): Unit = {
    length = state
  }
}

DynLengthFilter Пользовательская функция реализует Checkpointed Интерфейс для длины фильтра. В случае сбоя эта информация автоматически восстанавливается.

Другие вопросы по тегам