Storm Trident - Непрерывное излучение из агрегатора, даже если в кафке нет данных
У меня есть топология, которая берет кортежи из кафки.
Топология выглядит примерно так.
topology.newStream("kafkaSpout", spout)
.shuffle()
.each(new Fields("str"), new Filter())
.parallelismHint(5)
.each(new Fields("str"), new Function(),
new Fields("some fields"))
.parallelismHint(5)
.partitionBy(new Fields("some field"))
.partitionAggregate(new Fields("some fields"),
new SomeAggregator(),
new Fields(""some fields""))
.parallelismHint(5)
.partitionPersist(new StateFactory(),
new Fields("some filed"),
new StateUpdater());
Из документации я понимаю, что в агрегаторе метод агрегата вызывается для каждого кортежа, полный метод вызывается при обработке всех кортежей.
Я поместил отладочные sysouts в метод init и агрегат. Когда я запускаю топологию, я вижу непрерывные системные вызовы из метода init и aggregate. Ожидается ли такое поведение? (Обратите внимание, что в Кафке нет данных)