Как отобразить кортежи с постоянным состоянием в Trident?
Я изучаю основы Trident. Есть три метода на Trident Stream
s для агрегации кортежей в пакете, включая этот, который позволяет предварительно сформировать отображение кортежей с сохранением состояния, используя Aggregator
интерфейс. Но, к сожалению, встроенный аналог дополнительно сохраняет состояние карты, как и другие 9 перегрузок persistentAggregate()
, только с Aggregator
в качестве аргумента, нет.
Таким образом, как я могу реализовать желаемую функциональность, комбинируя низкоуровневые абстракции и инструменты Trident и Storm? Изучить API довольно сложно, потому что документации по Javadoc практически нет.
Другими словами, persistentAggregate()
методы позволяют завершить потоковую обработку обновлением некоторого постоянного состояния:
stream of tuples ---> persistent state
Я хочу обновить постоянное состояние и, между прочим, выдавать разные кортежи:
stream of tuples ------> stream of different tuples
with
persistent state
Stream.aggregate(Fields, Aggregator, Fields)
не обеспечивает отказоустойчивость:
stream of tuples ------> stream of different tuples
with
simple in-memory state
1 ответ
Вы можете создать новый поток из состояния, используя метод TridentState # newValuesStream (). Это позволит вам получить поток агрегированных значений.
В целях иллюстрации, мы можем улучшить пример в документации Trident, добавив этот метод и фильтр отладки:
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 3,
new Values("the cow jumped over the moon"),
new Values("the man went to the store and bought some candy"),
new Values("four score and seven years ago"),
new Values("how many apples can you eat"));
spout.setCycle(true);
TridentTopology topology = new TridentTopology();
topology.newStream("spout1", spout)
.each(new Fields("sentence"), new Split(), new Fields("word"))
.groupBy(new Fields("word"))
.persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))
.newValuesStream().each(new Fields("count"), new Debug());
Запуск этой топологии выведет (на консоль) агрегированные значения.
Надеюсь, поможет