Простой пример Scala API для CEP не показывает вывод
Я программирую простой пример для тестирования нового Scala API для CEP во Flink, используя последнюю версию Github для 1.1-SNAPSHOT.
Шаблон является только проверкой значения и выводит одну строку в качестве результата для каждого сопоставленного шаблона. Код выглядит следующим образом:
val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4)
val cepEventAlert = CEP.pattern(streamingAlert, pattern)
def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = {
val startEvent = pattern.get("start").get
"Alerta:"+startEvent._1+": Pattern"
}
val patternStreamSelected = cepEventAlert.select(selectFn(_))
patternStreamSelected.print()
Он компилируется и запускается под 1.1-SNAPSHOT без проблем, но вывод менеджера заданий не показывает никаких признаков этого print(). Даже ослабление условий шаблона и установка только "начала" (Принятие всех событий) не дает абсолютно ничего.
Кроме того, при попытке добавить этапы код не компилируется. Если я изменю шаблон на (Нахождение двух последовательных событий с 3-м полем меньше 4):
Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))
Затем компилятор выдает:
error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4))
Ошибка отображается на первом месте, где () после этапа "start". Я попытался явно установить тип параметра с помощью:
(x: (String, Long, Int)) => x._3 < 4
Таким образом, он снова компилируется, но когда он запускается на Flink, вывод не отображается. StreamingAlert - это Scala DataStream[(String, Long, Int)], а в других частях кода я могу фильтровать с помощью _._ < 4
без проблем и вывод кажется правильным.
1 ответ
print()
Вызов API в API потоковой передачи не запускает активное выполнение. Вам все еще нужно позвонить env.execute()
в конце вашей программы.
Когда вы определяете свой шаблон, вы должны указать тип события где-нибудь. Либо вы делаете это, как вы сделали, или вы делаете это с помощью параметра типа для begin
:
Pattern.begin[(String, Long, Int)]("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))