Hazelcast Jet 0.6.1- Агрегирование по нескольким полям

Текущий пример кода Hazelcast Jet 0.6.1 демонстрирует агрегирование на основе одного поля (например, тикера).

Вот ссылка.

\ Кодовых образцы \ потокового \ биржевой \ SRC \ главная \ Java\StockExchange.java

Как это можно продлить для нескольких тикеров, трейдеров и т. Д.

Вот текущий пример кода из StockExchange.java

 private static Pipeline buildPipeline() {
    Pipeline p = Pipeline.create();

    p.drawFrom(Sources.<Trade, Integer, Trade>mapJournal(TRADES_MAP_NAME,
            alwaysTrue(), EventJournalMapEvent::getNewValue, START_FROM_CURRENT))
     .addTimestamps(Trade::getTime, 3000)
     .groupingKey(Trade::getTicker)
     .window(WindowDefinition.sliding(SLIDING_WINDOW_LENGTH_MILLIS, SLIDE_STEP_MILLIS))
     .aggregate(counting(),
             (winStart, winEnd, key, result) -> String.format("%s %5s %4d", toLocalTime(winEnd), key, result))
     .drainTo(Sinks.logger());

    return p;
}

2 ответа

Решение

Для тикера и трейдера вы можете использовать:

.groupingKey(trade -> Tuple2.tuple2(trade.getTicker(), trade.getTraderId()))

Как правило, ключ может быть чем угодно, что реализует equals а также hashCode должным образом. Tuple2 является универсальным контейнером для двух значений.

Мы также можем предоставить ключи, разделенные запятыми, для группировки. .aggregate (AggregateOperations.groupingBy (данные -> {StringBuilder stringBuilder = new StringBuilder(); stringBuilder.append (StringUtils.defaultString(data.getSource1 (). get (dataValue) + "", "")). append (",");return stringBuilder.substring(0, stringBuilder.toString (). length() - 1);

              }));
Другие вопросы по тегам