Простой пример Scala API для CEP не показывает вывод

Я программирую простой пример для тестирования нового Scala API для CEP во Flink, используя последнюю версию Github для 1.1-SNAPSHOT.

Шаблон является только проверкой значения и выводит одну строку в качестве результата для каждого сопоставленного шаблона. Код выглядит следующим образом:

val pattern : Pattern[(String, Long, Int), _] = Pattern.begin("start").where(_._3 < 4)

val cepEventAlert = CEP.pattern(streamingAlert, pattern)

def selectFn(pattern : mutable.Map[String, (String, Long, Int)]): String = {
    val startEvent = pattern.get("start").get
    "Alerta:"+startEvent._1+": Pattern"
}

val patternStreamSelected = cepEventAlert.select(selectFn(_))

patternStreamSelected.print()

Он компилируется и запускается под 1.1-SNAPSHOT без проблем, но вывод менеджера заданий не показывает никаких признаков этого print(). Даже ослабление условий шаблона и установка только "начала" (Принятие всех событий) не дает абсолютно ничего.

Кроме того, при попытке добавить этапы код не компилируется. Если я изменю шаблон на (Нахождение двух последовательных событий с 3-м полем меньше 4):

Pattern.begin("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))

Затем компилятор выдает:

error: missing parameter type for expanded function ((x$4) => x$4._3.$less(4))

Ошибка отображается на первом месте, где () после этапа "start". Я попытался явно установить тип параметра с помощью:

(x: (String, Long, Int)) => x._3 < 4

Таким образом, он снова компилируется, но когда он запускается на Flink, вывод не отображается. StreamingAlert - это Scala DataStream[(String, Long, Int)], а в других частях кода я могу фильтровать с помощью _._ < 4 без проблем и вывод кажется правильным.

1 ответ

print() Вызов API в API потоковой передачи не запускает активное выполнение. Вам все еще нужно позвонить env.execute() в конце вашей программы.

Когда вы определяете свой шаблон, вы должны указать тип события где-нибудь. Либо вы делаете это, как вы сделали, или вы делаете это с помощью параметра типа для begin:

Pattern.begin[(String, Long, Int)]("start").where(_._3 < 4).next("end").where(_._3 < 4).within(Time.seconds(30))
Другие вопросы по тегам