Можно ли обрабатывать несколько потоков в Apache Flink CEP?
Мой вопрос заключается в том, что, если у нас есть два необработанных потока событий, например, Smoke и Temperature, и мы хотим выяснить, произошло ли сложное событие, т.е. пожар, применяя операторы к необработанным потокам, можем ли мы сделать это во Flink?
Я задаю этот вопрос, потому что все примеры, которые я до сих пор видел для Flink CEP, включают только один входной поток. Пожалуйста, поправьте меня, если я ошибаюсь.
1 ответ
Краткий ответ - Да, вы можете читать и обрабатывать несколько потоков и правил срабатывания на основе типов событий из другого источника потока.
Длинный ответ - у меня было несколько похожее требование, и Мой ответ основан на предположении, что вы читаете разные потоки из разных тем кафки.
Читайте из разных тем, которые передают разные события в одном источнике:
FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
new StringSerializerToEvent(),
props);
kafkaSource.assignTimestampsAndWatermarks(new
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
.filter(Objects::nonNull);
Сериализатор считывает данные и анализирует их в общем формате - например.
@Data
public class BAMEvent {
private String keyid; //If key based partitioning is needed
private String eventName; // For different types of events
private String eventId; // Any other field you need
private long timestamp; // For event time based processing
public String toString(){
return eventName + " " + timestamp + " " + eventId + " " + correlationID;
}
}
и после этого все довольно просто, определите правила на основе имени события и сравните имя события для определения правил (Вы также можете определить сложные правила следующим образом):
Pattern.<BAMEvent>begin("first")
.where(new SimpleCondition<BAMEvent>() {
private static final long serialVersionUID = 1390448281048961616L;
@Override
public boolean filter(BAMEvent event) throws Exception {
return event.getEventName().equals("event1");
}
})
.followedBy("second")
.where(new IterativeCondition<BAMEvent>() {
private static final long serialVersionUID = -9216505110246259082L;
@Override
public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {
if (!secondEvent.getEventName().equals("event2")) {
return false;
}
for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
if (secondEvent.getEventId = firstEvent.getEventId()) {
return true;
}
}
return false;
}
})
.within(withinTimeRule);
Я надеюсь, что это дает вам идею объединить один или несколько разных потоков вместе.
Интересно, можно ли выполнить Strict chaining (вместо followedBy, если можно использовать next), потому что в данном потоке может быть много событий для определенной временной метки. Так, скажем, для времени t1-: a,b,c - эти три события наступают, а для времени t2-: a2,b2,c2 приходят, чтобы запустить двигатель. Итак, мне интересно, как мы получаем event(a).next(a2), потому что этого может никогда не быть, поскольку серия будет чем-то вроде -: a b c a2 b2 c2
однако, если модуль CEP обрабатывает события так, что он рассматривает одну метку времени как отдельное событие, тогда это имеет смысл.