Двойные кортежи грозового кластера
В настоящее время я работаю над проектом, в котором я настроил кластер Storm на четырех хостах Unix.
Сама топология выглядит следующим образом:
- JMS Spout слушает MQ для новых сообщений
- JMS Spout анализирует, а затем отправляет результат в болт Эспера
- Затем Болт Эспера обрабатывает событие и отправляет результат в Болт JMS.
- Затем JMS Bolt публикует сообщение в MQ на другую тему.
Я понимаю, что Storm - это "хотя бы один раз" фреймворк. Однако, если я получаю 5 событий и передаю их на Болт Эспера для подсчета, то по какой-то причине я получаю 5 результатов подсчета в Болте JMS (все то же значение).
В идеале, я хочу получить один результат вывода, есть ли способ, как я могу сказать Storm игнорировать дубликаты кортежей?
Я думаю, что это как-то связано с настроенным мной параллелизмом, потому что он работает, как и ожидалось, когда у меня только один поток:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
.fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));
Я также видел Trident для семантики "точно один раз". Однако я не совсем уверен, что это решит эту проблему.
1 ответ
Если ваш болт Эспера явно не ack() каждый кортеж в конце своего метода execute() ИЛИ не использует реализацию iBasicBolt, то каждый кортеж, который он получает, будет в конечном итоге воспроизведен вашим источником JMS Spout после истечения времени ожидания.
В качестве альтернативы, если вы просите свой болт "обрабатывать только уникальные сообщения", попробуйте добавить это поведение обработки в ваш метод execute(). Сначала он может проверить локальный кеш Guava на уникальность значений кортежей, а затем обработать соответствующим образом.