Flink: синхронизировать / соединить два потока
У меня есть два потока, первый, с библиотекой CEP, он использует ее для определения паттернов ON-OFF. Во втором потоке находятся значения, которые я должен добавить. Я ищу способ соединения двух потоков, чтобы при получении кадра OFF я возвращал изменение состояния устройства с конечным значением суммы кадров второго типа:
Device 311 State Change:Ignition OFF Value:35.91
Мой код:
// IGNITION pattern
val ig = timedValue.filter(_._2 == "Ignition").keyBy(_._4)
val patternIG_ON: Pattern[(Long,String,String,Long), _] = Pattern.begin[(Long,String,String,Long)]("start").where(_._3 == "OFF").next("end").where(_._3 == "ON")
val patternStream: PatternStream[(Long,String,String,Long)] = CEP.pattern(ig, patternIG_ON)
val patternIG_OFF: Pattern[(Long,String,String,Long), _] = Pattern.begin[(Long,String,String,Long)]("start").where(_._3 == "ON").next("end").where(_._3 == "OFF")
val patternStreamIG_OFF: PatternStream[(Long,String,String,Long)] = CEP.pattern(timedValue, patternIG_OFF)
def selectFn(pattern : mutable.Map[String,(Long,String,String,Long)]): String = {
val startEvent = pattern.get("start").get
val endEvent = pattern.get("end").get
"Device "+startEvent._4 +" State Change: "+startEvent._2+" ON"
}
def selectFnOFF(pattern : mutable.Map[String,(Long,String,String,Long)]): String = {
val startEvent = pattern.get("start").get
val endEvent = pattern.get("end").get
"Device "+startEvent._4 +" State Change: "+startEvent._2+" OFF "
}
val patternStreamSelected = patternStream.select(selectFn(_)).print()
val suma: org.apache.flink.streaming.api.scala.DataStream[(Long, Double)] = odo.map(s => (s._4,s._3)).keyBy(_._1).mapWithState(
(in, state: Option[(Int, Double)]) => state match {
case Some((count, sum)) => ((in._1,(sum + in._2).toDouble), Some((count + 1, sum + in._2)))
case None => ((in._1, in._2), Some(1, in._2))
})
val patternStreamSelectedOFF = patternStreamIG_OFF.select(selectFnOFF(_)).connect(suma).keyBy(_._1, _._1).map(l => l + " Value:", r => r._2).print()
Обновление: мой результат:
24.2
23.5
44.010000000000005
67.05000000000001
Device 311 State Change3:Ignition ON
(311,Device 311 State Change: Ignition OFF ) Value:
первое число (24.2) с другого устройства, оно не должно появляться...
Любая идея, каков наилучший подход? Спасибо