Двойные кортежи грозового кластера

В настоящее время я работаю над проектом, в котором я настроил кластер Storm на четырех хостах Unix.

Сама топология выглядит следующим образом:

  1. JMS Spout слушает MQ для новых сообщений
  2. JMS Spout анализирует, а затем отправляет результат в болт Эспера
  3. Затем Болт Эспера обрабатывает событие и отправляет результат в Болт JMS.
  4. Затем JMS Bolt публикует сообщение в MQ на другую тему.

Я понимаю, что Storm - это "хотя бы один раз" фреймворк. Однако, если я получаю 5 событий и передаю их на Болт Эспера для подсчета, то по какой-то причине я получаю 5 результатов подсчета в Болте JMS (все то же значение).

В идеале, я хочу получить один результат вывода, есть ли способ, как я могу сказать Storm игнорировать дубликаты кортежей?

Я думаю, что это как-то связано с настроенным мной параллелизмом, потому что он работает, как и ожидалось, когда у меня только один поток:

 TopologyBuilder builder = new TopologyBuilder();
    builder.setSpout(JMS_DATA_SPOUT, new JMSDataSpout(),2).setNumTasks(2);
    builder.setBolt("esperBolt", new EsperBolt.Builder().build(),6).setNumTasks(6)
            .fieldsGrouping(JMS_DATA_SPOUT,new Fields("eventGrouping"));
    builder.setBolt("jmsBolt", new JMSBolt(),2).setNumTasks(2).fieldsGrouping("esperBolt", new Fields("eventName"));

Я также видел Trident для семантики "точно один раз". Однако я не совсем уверен, что это решит эту проблему.

1 ответ

Если ваш болт Эспера явно не ack() каждый кортеж в конце своего метода execute() ИЛИ не использует реализацию iBasicBolt, то каждый кортеж, который он получает, будет в конечном итоге воспроизведен вашим источником JMS Spout после истечения времени ожидания.

В качестве альтернативы, если вы просите свой болт "обрабатывать только уникальные сообщения", попробуйте добавить это поведение обработки в ваш метод execute(). Сначала он может проверить локальный кеш Guava на уникальность значений кортежей, а затем обработать соответствующим образом.

Другие вопросы по тегам