Можно ли обрабатывать несколько потоков в Apache Flink CEP?

Мой вопрос заключается в том, что, если у нас есть два необработанных потока событий, например, Smoke и Temperature, и мы хотим выяснить, произошло ли сложное событие, т.е. пожар, применяя операторы к необработанным потокам, можем ли мы сделать это во Flink?

Я задаю этот вопрос, потому что все примеры, которые я до сих пор видел для Flink CEP, включают только один входной поток. Пожалуйста, поправьте меня, если я ошибаюсь.

1 ответ

Решение

Краткий ответ - Да, вы можете читать и обрабатывать несколько потоков и правил срабатывания на основе типов событий из другого источника потока.

Длинный ответ - у меня было несколько похожее требование, и Мой ответ основан на предположении, что вы читаете разные потоки из разных тем кафки.

Читайте из разных тем, которые передают разные события в одном источнике:

FlinkKafkaConsumer010<BAMEvent> kafkaSource = new FlinkKafkaConsumer010<>(
        Arrays.asList("topicStream1", "topicStream2", "topicStream3"),
        new StringSerializerToEvent(),
        props);

kafkaSource.assignTimestampsAndWatermarks(new 
TimestampAndWatermarkGenerator());
DataStream<BAMEvent> events = env.addSource(kafkaSource)
        .filter(Objects::nonNull);

Сериализатор считывает данные и анализирует их в общем формате - например.

@Data
public class BAMEvent {
 private String keyid;  //If key based partitioning is needed
 private String eventName; // For different types of events
 private String eventId;  // Any other field you need
 private long timestamp; // For event time based processing 

 public String toString(){
   return eventName + " " + timestamp + " " + eventId + " " + correlationID;
 }

}

и после этого все довольно просто, определите правила на основе имени события и сравните имя события для определения правил (Вы также можете определить сложные правила следующим образом):

Pattern.<BAMEvent>begin("first")
        .where(new SimpleCondition<BAMEvent>() {
          private static final long serialVersionUID = 1390448281048961616L;

          @Override
          public boolean filter(BAMEvent event) throws Exception {
            return event.getEventName().equals("event1");
          }
        })
        .followedBy("second")
        .where(new IterativeCondition<BAMEvent>() {
          private static final long serialVersionUID = -9216505110246259082L;

          @Override
          public boolean filter(BAMEvent secondEvent, Context<BAMEvent> ctx) throws Exception {

            if (!secondEvent.getEventName().equals("event2")) {
              return false;
            }

            for (BAMEvent firstEvent : ctx.getEventsForPattern("first")) {
              if (secondEvent.getEventId = firstEvent.getEventId()) {
                return true;
              }
            }
            return false;
          }
        })
        .within(withinTimeRule);

Я надеюсь, что это дает вам идею объединить один или несколько разных потоков вместе.

Интересно, можно ли выполнить Strict chaining (вместо followedBy, если можно использовать next), потому что в данном потоке может быть много событий для определенной временной метки. Так, скажем, для времени t1-: a,b,c - эти три события наступают, а для времени t2-: a2,b2,c2 приходят, чтобы запустить двигатель. Итак, мне интересно, как мы получаем event(a).next(a2), потому что этого может никогда не быть, поскольку серия будет чем-то вроде -: a b c a2 b2 c2

однако, если модуль CEP обрабатывает события так, что он рассматривает одну метку времени как отдельное событие, тогда это имеет смысл.

Другие вопросы по тегам