Параллелизм данных в Storm
Я прочитал о шторме Apache и сделал несколько основных уроков. Я имею в виду следующую топологию, которую я хотел бы реализовать с помощью шторма, но не уверен, как справиться с распределением данных. Бизнес-требование заключается в оценке портфеля клиентов в режиме реального времени. В упрощенном виде это включает в себя: 1) Принять живой поток рыночных цен (валют, товаров и т. Д.). 2) Для каждого тика цены рассчитать текущую прибыль каждой позиции и преобразовать ее в валюту счета клиента. и объем всех позиций на клиента и генерировать сигналы, если требуется 4) На уровне клиента расчет должен быть последовательным и атомарным / сериализованным. Т.е. все позиции должны оцениваться с каждым тиком в том порядке, в котором они были введены в систему, а итоги должны рассчитываться на основе одной и той же цены, даже если у клиента есть сотни позиций. 5) Проанализируйте объемы / тренды всех позиций в системе, сгруппированные по символам / типу клиента / стране / и т. Д.... и сделайте их доступными на некоторой панели инструментов.
Все заказы выполняются и хранятся в rdbms. Мой главный вопрос - как распределить сотни тысяч позиций по болтам Storm по разным узлам, каждый из которых обрабатывает свою собственную часть. Использование Modulo достаточно для разделения клиентов, но как я могу предоставить идентификатор каждому экземпляру болта, чтобы каждый из них обрабатывал только свою равную часть клиентов? Есть ли что-то нестандартное в Storm для этого? Другой вопрос, как сделать вышеупомянутые агрегации эффективно?
1 ответ
Ты можешь использовать fieldsGrouping
для этого. Вы можете объявить поле, по которому группируются кортежи (в вашем случае id
).
Я просто предполагаю, что ваш входной поток - это объект JSON с полем id и body, например
{"id":"1234","body":"some body"}
Также предположим, что ваша топология имеет один носик, два болта, а именно BoltA и BoltB.
В BoltB переопределите метод DeclareOutputFields и заполните детали.
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id","log"));
}
И вы можете объявить топологию, как показано ниже
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", spout, 1);
builder.setBolt("boltA", new BoltA(), 1)
.shuffleGrouping("spout");
builder.setBolt("counterBolt", new BoltB(), 1).fieldsGrouping("boltB", new Fields("id"));
В этом случае кортежи с одинаковыми идентификаторами из boltA
будет доставлен в тот же экземпляр boltB