Как посчитать, сколько "клиентов" находятся в состоянии с распределенным миганием на основе событий изменения состояния? Мне нужны объекты с состоянием

Я работаю над проектом POC в Java, используя Kafka -> Flink -> эластичный поиск.

На кафке будет производиться непредсказуемое количество событий от 0 до тысяч событий в секунду, например по определенной теме.

{"gid":"abcd-8910-2ca4227527f9", "state":"stateA", "timestamp:1465566255, "other unusefull info":"..."} 

Flink будет использовать эти события и должен каждую секунду погружаться в упругий поиск числа событий в каждом состоянии, например:

{"stateA":54, "stateB":100, ... "stateJ":34}

У меня 10 штатов: [Created, ... , Deleted] со средним жизненным циклом 15 минут. Состояние может меняться дважды в секунду. Теоретически новые состояния могут быть добавлены.

Чтобы пропускать потоки каждую секунду, я собираюсь использовать временные окна Flink https://flink.apache.org/news/2015/12/04/Introducing-windows.html

Проблема в том, что мне нужны объекты с состоянием с информацией о guid->previous-state а также stateX->count чтобы иметь возможность увеличивать / уменьшать счет при появлении нового события.

Я нашел черновик документа о паровой обработке с сохранением состояния https://cwiki.apache.org/confluence/display/FLINK/Stateful+Stream+Processing

Я новичок в обработке потоков и потоков, я еще не копался в обработке потоков с отслеживанием состояния. Для первого этапа я думаю использовать статические объекты для этого, но этот подход не будет работать, когда будет запущено несколько экземпляров flink.

Я хочу тебя спросить:

  1. Что вы думаете об этом подходе?
  2. Подходит ли flink для такого рода потоковой обработки?
  3. Каким будет ваш подход к решению этой проблемы?

Также я был бы признателен за некоторые фрагменты кода для оконного решения с отслеживанием состояния (или других решений).

Спасибо,

1 ответ

Как насчет чего-то вроде следующего?

Он использует 15-минутные окна, после чего состояние окна будет очищено. Он также использует пользовательский триггер, который оценивает окно каждую секунду. Что касается оконной операции, то существует функция ReduceFunction, которая просто сохраняет последнее состояние для каждой guid, и функция WindowFunction, которая генерирует кортеж (state, 1). Затем мы определяем это состояние и суммируем это. Я думаю, что это должно дать вам результат, который вы ищете.

val env = StreamExecutionEnvironment.getExecutionEnvironment()
val stream = env.addSource(new FlinkKafkaProducer(...))

val results = stream
  .keyBy(_.guid)
  .timeWindow(Time.minutes(15))
  .trigger(ProcessingTimeTriggerWithPeriodicFirings(1000))
  .apply(
    (e1, e2) => e2,
    (k, w, i, c: Collector[(String, Long)]) => {
      if (i.head != null) c.collect((i.head.state, 1))
    }
  )
  .keyBy(0)
  .timeWindow(Time.seconds(1))
  .sum(1)
  .addSink(new ElasticsearchSink<>(...))

env.execute("Count States")

ProcessingTimeTriggerWithPeriodicFirings определяется следующим образом:

object ProcessingTimeTriggerWithPeriodicFirings {
  def apply(intervalMs: Long) = {
    new ProcessingTimeTriggerWithPeriodicFirings(intervalMs)
  }
}

class ProcessingTimeTriggerWithPeriodicFirings(intervalMs: Long)
  extends Trigger[Event, TimeWindow] {

  private val startTimeDesc =
    new ValueStateDescriptor[Long]("start-time", classOf[Long], 0L)

  override def onElement(element: Event, timestamp: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    val startTime = ctx.getPartitionedState(startTimeDesc)
    if (startTime.value == 0) {
      startTime.update(window.getStart)
      ctx.registerProcessingTimeTimer(window.getEnd)
      ctx.registerProcessingTimeTimer(System.currentTimeMillis() + intervalMs)
    }
    TriggerResult.CONTINUE
  }

  override def onProcessingTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    if (time == window.getEnd) {
      TriggerResult.PURGE
    }
    else {
      ctx.registerProcessingTimeTimer(time + intervalMs)
      TriggerResult.FIRE
    }
  }

  override def onEventTime(time: Long, window: TimeWindow, ctx: TriggerContext): TriggerResult = {
    TriggerResult.CONTINUE
  }
}
Другие вопросы по тегам