Как выразить логику дедупликации событий при обработке потока Сиддхи

Привет: мне нужна следующая логика дедупликации, которая будет реализована при обработке потока Сиддхи. Предположим, у меня есть InputStream, и я хочу создать OutputStream следующим образом:

(1) когда событие является первым (поскольку запускается механизм обработки событий) в InputStream, вставьте событие в OutputStream.

(2) если событие с такой же подписью, например, с тем же именем события, поступает в течение 2-минутных окон, мы считаем, что событие идентично, и мы НЕ должны вставлять событие в OutputStream. В противном случае мы должны вставить событие в OutputStream.

Я пытался использовать шаблон событий, чтобы сделать фильтрацию. Тем не менее, я не могу найти, что я могу выразить "логику отрицания" в сиддхи, то есть, если (нет ( e1 -> e2 с той же сигнатурой в 2-минутном окне)). Есть ли умный способ выполнить такую ​​логику дедупликации событий? Обратите внимание, что дедупликация событий является очень распространенным выражением, необходимым для обработки событий.

Если бы я реализовал это на Java, это относительно просто. Я создам хэш-таблицу. Когда приходит первое событие, я регистрирую его в хэш-памяти и устанавливаю приемлемое время события для этого зарегистрированного события на 2 минуты позже. Когда приходит следующее событие, я просматриваю хеш-таблицу и сравниваю допустимое время полученного события с моим текущим временем события. Если текущее время меньше допустимого времени, я не буду рассматривать его как выходное событие. Вместо реализации Java, я предпочитаю иметь декларативное решение, реализованное в запросе потоковой обработки Сиддхи, если это возможно.

1 ответ

Вы можете использовать таблицу в памяти и достичь этого; Пожалуйста, найдите образец ниже; это очень похоже на ваш подход с Java.

    define stream InputStream (event_id string, data string);  
    define stream OutputStream (event_id string, data string);  
    define table ProcessedEvents (event_id string);

    from InputStream[not(ProcessedEvents.event_id == event_id in ProcessedEvents)]  
    insert into OutputStream ; 

    from OutputStream  
    select event_id  
    insert into ProcessedEvents ; 

    from OutputStream#window.time(2 sec)  
    select event_id  
    insert expired events into PurgeStream ; 

    from PurgeStream  
    delete ProcessedEvents  
       on ProcessedEvents.event_id == event_id ;
Другие вопросы по тегам