Как сессионизировать / сгруппировать события в Akka Streams?
Требование заключается в том, что я хочу написать потоковое приложение Akka, которое прослушивает непрерывные события от Kafka, а затем выполняет сеанс данных события во временном интервале на основе некоторого значения идентификатора, встроенного в каждое событие.
Например, предположим, что мое окно времени составляет две минуты, и в первые две минуты я получаю четыре события ниже:
Входные данные:
{"message-domain":"1234","id":1,"aaa":"bbb"}
{"message-domain":"1234","id":2,"aaa":"bbb"}
{"message-domain":"5678","id":4,"aaa":"bbb"}
{"message-domain":"1234","id":3,"aaa":"bbb"}
Затем в выходных данных, после группировки / сеанса этих событий, у меня будет только два события на основе их значения в домене сообщений.
Выход:
{"message-domain":"1234",messsages:[{"id":1,"aaa":"bbb"},{"id":2,"aaa":"bbb"},{"id":4,"aaa":"bbb"}]}
{"message-domain":"5678",messsages:[{"id":3,"aaa":"bbb"}]}
И я хочу, чтобы это произошло в режиме реального времени. Любые предложения о том, как этого добиться?
1 ответ
Для группировки событий в пределах временного окна вы можете использовать Flow.groupedWithin
:
val maxCount : Int = Int.MaxValue
val timeWindow = FiniteDuration(2L, TimeUnit.MINUTES)
val timeWindowFlow : Flow[String, Seq[String]] =
Flow[String] groupedWithin (maxCount, timeWindow)