Штормовая группировка по нескольким полям
Что я должен сделать, это сгруппировать поток по двум полям ("remote-client-ip", "request-params"
) и подсчитать количество кортежей в каждой группе. И объединить их в карту. Вот моя топология:
topology.newStream("kafka-spout-stream-1", repeatSpout)
.each(new Fields("str"), new URLParser(), new Fields(fieldNames))
.each(new Fields("remote-client-ip", "request-params"), new HTTPParameterExtractor(), new Fields("query-string"))
.groupBy(new Fields("remote-client-ip", "query-string"))
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))
.groupBy(new Fields("remote-client-ip"))
.persistentAggregate(new MemoryMapState.Factory(), new UserQueryStringCombiner(), new Fields("user-word-count-list"));
Но после отладки я обнаружил, что поток данных сначала заблокирован groupBy()
, которая является группировкой по нескольким полям. Я ничего не казнил за Count()
в последующем сводном отчете.
Таким образом, я думаю, что неправильно понимаю некоторую концепцию о взаимодействии между многопольными группировкой и агрегацией.
Пожалуйста, дайте мне знать, правильны ли мои предположения или нет. Спасибо!
1 ответ
Решение
Вы группируете уже сгруппированные поля с Aggregate()
функция в вашей топологии. Попробуй это:
.aggregate(new Count(), new Fields("user-word-count"))
Вместо этого:
.aggregate(new Fields("remote-client-ip", "query-string"), new Count(), new Fields("user-word-count"))