Как сессионизировать / сгруппировать события в Akka Streams?

Требование заключается в том, что я хочу написать потоковое приложение Akka, которое прослушивает непрерывные события от Kafka, а затем выполняет сеанс данных события во временном интервале на основе некоторого значения идентификатора, встроенного в каждое событие.

Например, предположим, что мое окно времени составляет две минуты, и в первые две минуты я получаю четыре события ниже:

Входные данные:

{"message-domain":"1234","id":1,"aaa":"bbb"}
{"message-domain":"1234","id":2,"aaa":"bbb"}
{"message-domain":"5678","id":4,"aaa":"bbb"}
{"message-domain":"1234","id":3,"aaa":"bbb"}

Затем в выходных данных, после группировки / сеанса этих событий, у меня будет только два события на основе их значения в домене сообщений.

Выход:

{"message-domain":"1234",messsages:[{"id":1,"aaa":"bbb"},{"id":2,"aaa":"bbb"},{"id":4,"aaa":"bbb"}]}
{"message-domain":"5678",messsages:[{"id":3,"aaa":"bbb"}]}

И я хочу, чтобы это произошло в режиме реального времени. Любые предложения о том, как этого добиться?

1 ответ

Для группировки событий в пределах временного окна вы можете использовать Flow.groupedWithin:

val maxCount : Int = Int.MaxValue

val timeWindow = FiniteDuration(2L, TimeUnit.MINUTES)

val timeWindowFlow : Flow[String, Seq[String]] =
  Flow[String] groupedWithin (maxCount, timeWindow)
Другие вопросы по тегам