Параллелизм данных в Storm

Я прочитал о шторме Apache и сделал несколько основных уроков. Я имею в виду следующую топологию, которую я хотел бы реализовать с помощью шторма, но не уверен, как справиться с распределением данных. Бизнес-требование заключается в оценке портфеля клиентов в режиме реального времени. В упрощенном виде это включает в себя: 1) Принять живой поток рыночных цен (валют, товаров и т. Д.). 2) Для каждого тика цены рассчитать текущую прибыль каждой позиции и преобразовать ее в валюту счета клиента. и объем всех позиций на клиента и генерировать сигналы, если требуется 4) На уровне клиента расчет должен быть последовательным и атомарным / сериализованным. Т.е. все позиции должны оцениваться с каждым тиком в том порядке, в котором они были введены в систему, а итоги должны рассчитываться на основе одной и той же цены, даже если у клиента есть сотни позиций. 5) Проанализируйте объемы / тренды всех позиций в системе, сгруппированные по символам / типу клиента / стране / и т. Д.... и сделайте их доступными на некоторой панели инструментов.

Все заказы выполняются и хранятся в rdbms. Мой главный вопрос - как распределить сотни тысяч позиций по болтам Storm по разным узлам, каждый из которых обрабатывает свою собственную часть. Использование Modulo достаточно для разделения клиентов, но как я могу предоставить идентификатор каждому экземпляру болта, чтобы каждый из них обрабатывал только свою равную часть клиентов? Есть ли что-то нестандартное в Storm для этого? Другой вопрос, как сделать вышеупомянутые агрегации эффективно?

1 ответ

Решение

Ты можешь использовать fieldsGrouping для этого. Вы можете объявить поле, по которому группируются кортежи (в вашем случае id).

Я просто предполагаю, что ваш входной поток - это объект JSON с полем id и body, например

{"id":"1234","body":"some body"}

Также предположим, что ваша топология имеет один носик, два болта, а именно BoltA и BoltB.

В BoltB переопределите метод DeclareOutputFields и заполните детали.

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("id","log"));
}

И вы можете объявить топологию, как показано ниже

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("boltA", new BoltA(), 1)
       .shuffleGrouping("spout");
builder.setBolt("counterBolt", new BoltB(), 1).fieldsGrouping("boltB", new Fields("id"));

В этом случае кортежи с одинаковыми идентификаторами из boltA будет доставлен в тот же экземпляр boltB

Другие вопросы по тегам