Flink: синхронизировать / соединить два потока

У меня есть два потока, первый, с библиотекой CEP, он использует ее для определения паттернов ON-OFF. Во втором потоке находятся значения, которые я должен добавить. Я ищу способ соединения двух потоков, чтобы при получении кадра OFF я возвращал изменение состояния устройства с конечным значением суммы кадров второго типа:

Device 311 State Change:Ignition OFF  Value:35.91

Мой код:

// IGNITION pattern
val ig = timedValue.filter(_._2 == "Ignition").keyBy(_._4)
val patternIG_ON: Pattern[(Long,String,String,Long), _] = Pattern.begin[(Long,String,String,Long)]("start").where(_._3 == "OFF").next("end").where(_._3 == "ON")
val patternStream: PatternStream[(Long,String,String,Long)] = CEP.pattern(ig, patternIG_ON)
val patternIG_OFF: Pattern[(Long,String,String,Long), _] = Pattern.begin[(Long,String,String,Long)]("start").where(_._3 == "ON").next("end").where(_._3 == "OFF")
val patternStreamIG_OFF: PatternStream[(Long,String,String,Long)] = CEP.pattern(timedValue, patternIG_OFF)

def selectFn(pattern : mutable.Map[String,(Long,String,String,Long)]): String = {
  val startEvent = pattern.get("start").get
  val endEvent = pattern.get("end").get
  "Device "+startEvent._4 +" State Change: "+startEvent._2+" ON"
}
def selectFnOFF(pattern : mutable.Map[String,(Long,String,String,Long)]): String = {
  val startEvent = pattern.get("start").get
  val endEvent = pattern.get("end").get
  "Device "+startEvent._4 +" State Change: "+startEvent._2+" OFF "
}

val patternStreamSelected = patternStream.select(selectFn(_)).print()
val suma: org.apache.flink.streaming.api.scala.DataStream[(Long, Double)] = odo.map(s => (s._4,s._3)).keyBy(_._1).mapWithState(
  (in, state: Option[(Int, Double)]) => state match {
    case Some((count, sum)) => ((in._1,(sum + in._2).toDouble), Some((count + 1, sum + in._2)))
    case None => ((in._1, in._2), Some(1, in._2))
  })
val patternStreamSelectedOFF = patternStreamIG_OFF.select(selectFnOFF(_)).connect(suma).keyBy(_._1, _._1).map(l => l + " Value:", r => r._2).print()

Обновление: мой результат:

24.2
23.5
44.010000000000005
67.05000000000001
Device 311 State Change3:Ignition ON
(311,Device 311 State Change: Ignition OFF ) Value:

первое число (24.2) с другого устройства, оно не должно появляться...

Любая идея, каков наилучший подход? Спасибо

0 ответов

Другие вопросы по тегам