Как выразить логику дедупликации событий при обработке потока Сиддхи
Привет: мне нужна следующая логика дедупликации, которая будет реализована при обработке потока Сиддхи. Предположим, у меня есть InputStream, и я хочу создать OutputStream следующим образом:
(1) когда событие является первым (поскольку запускается механизм обработки событий) в InputStream, вставьте событие в OutputStream.
(2) если событие с такой же подписью, например, с тем же именем события, поступает в течение 2-минутных окон, мы считаем, что событие идентично, и мы НЕ должны вставлять событие в OutputStream. В противном случае мы должны вставить событие в OutputStream.
Я пытался использовать шаблон событий, чтобы сделать фильтрацию. Тем не менее, я не могу найти, что я могу выразить "логику отрицания" в сиддхи, то есть, если (нет ( e1 -> e2 с той же сигнатурой в 2-минутном окне)). Есть ли умный способ выполнить такую логику дедупликации событий? Обратите внимание, что дедупликация событий является очень распространенным выражением, необходимым для обработки событий.
Если бы я реализовал это на Java, это относительно просто. Я создам хэш-таблицу. Когда приходит первое событие, я регистрирую его в хэш-памяти и устанавливаю приемлемое время события для этого зарегистрированного события на 2 минуты позже. Когда приходит следующее событие, я просматриваю хеш-таблицу и сравниваю допустимое время полученного события с моим текущим временем события. Если текущее время меньше допустимого времени, я не буду рассматривать его как выходное событие. Вместо реализации Java, я предпочитаю иметь декларативное решение, реализованное в запросе потоковой обработки Сиддхи, если это возможно.
1 ответ
Вы можете использовать таблицу в памяти и достичь этого; Пожалуйста, найдите образец ниже; это очень похоже на ваш подход с Java.
define stream InputStream (event_id string, data string);
define stream OutputStream (event_id string, data string);
define table ProcessedEvents (event_id string);
from InputStream[not(ProcessedEvents.event_id == event_id in ProcessedEvents)]
insert into OutputStream ;
from OutputStream
select event_id
insert into ProcessedEvents ;
from OutputStream#window.time(2 sec)
select event_id
insert expired events into PurgeStream ;
from PurgeStream
delete ProcessedEvents
on ProcessedEvents.event_id == event_id ;