Hazelcast Jet 0.6.1- Агрегирование по нескольким полям
Текущий пример кода Hazelcast Jet 0.6.1 демонстрирует агрегирование на основе одного поля (например, тикера).
Вот ссылка.
\ Кодовых образцы \ потокового \ биржевой \ SRC \ главная \ Java\StockExchange.java
Как это можно продлить для нескольких тикеров, трейдеров и т. Д.
Вот текущий пример кода из StockExchange.java
private static Pipeline buildPipeline() {
Pipeline p = Pipeline.create();
p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
.addTimestamps(Trade::getTime, 3000)
.groupingKey(Trade::getTicker)
.window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
.aggregate(counting(),
(winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
.drainTo(Sinks.logger());
return p;
}
2 ответа
Для тикера и трейдера вы можете использовать:
.groupingKey(trade -> Tuple2.tuple2(trade.getTicker(), trade.getTraderId()))
Как правило, ключ может быть чем угодно, что реализует equals
а также hashCode
должным образом. Tuple2
является универсальным контейнером для двух значений.
Мы также можем предоставить ключи, разделенные запятыми, для группировки. .aggregate (AggregateOperations.groupingBy (данные -> {StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append (StringUtils.defaultString(data.getSource1 (). get (dataValue) + "", "")). append (",");return stringBuilder.substring(0, stringBuilder.toString (). length() - 1);
}));