Apache Storm spout прекращает излучать сообщения из носика

Мы долго боролись с этим вопросом. Короче говоря, наша топология шторма прекращает излучать сообщения из носика через некоторое время случайным образом. У нас есть автоматический сценарий, который повторно развертывает топологию каждый день в 06:00 UTC после завершения обновления основных данных.

За последние 2 недели наша топология перестала отправлять сообщения 3 раза в поздние часы UTC (с 22:00 до 02:00). Он появляется только после перезагрузки, которая происходит около 06:00 UTC.

Я искал много ответов и блогов, но не мог узнать, что здесь происходит. У нас есть топология без привязки, которую мы выбрали 3-4 года назад. Мы начали с 0.9.2 и сейчас находимся на 1.1.0.

Я проверил все виды журналов, и я на 100% уверен, что nextTuple() Метод для контроллера не вызывается, и в системе не происходит никаких исключений, которые могут вызвать это. Я также проверил все виды журналов, которые мы накапливаем, и нет ни одного журнала ошибок или предупреждений, объясняющих внезапную остановку. Журналы INFO также не так полезны. Нет ничего, что могло бы быть связано с этой проблемой в рабочих журналах или журналах супервизора или журналах nimbus.

Вот так выглядит наш класс spout:Controller.java

public class Controller implements IRichSpout {

    SpoutOutputCollector _collector;
    Calendar LAST_RUN = null;
    List<ControllerMessage> msgList;

    /**
     * It is to open the spout
     */
      public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
        _collector = collector;
        msgList= new ArrayList<ControllerMessage>();

        MongoIndexingHandler mongoIndexingHandler = new MongoIndexingHandler();
        mongoIndexingHandler.createMongoIndexes();

      }

      /**
       * It executes the next tuple
       */



    @Override
    public void nextTuple() {
           Map<String, Object> logMap = new HashMap<>();
            logMap.put("BEGIN", new Date());

        try {
            TriggerHandler thandler = new TriggerHandler();
            if (msgList.size() == 0) {
                List<ControllerMessage> mList = thandler.getControllerMessage(new Date());
                msgList = mList;
            }

            if (msgList.size() > 0) {
                ControllerMessage message = msgList.get(0);
                if(thandler.fire(message.getFireTime())) {
                    Util.log(message, "CONTROLLER_LOGS", message.getTime(), new Date());
                    msgList.remove(0);
                    _collector.emit(new Values(message));
                }

            }
            else{
                Utils.sleep(1000);
            }

        } catch (Exception e) {
            _collector.reportError(e);

            Util.exLog(e, "EXECUTOR_ERROR", new Date(), "nextTuple()",Controller.class);
        } 
    }

      /**
       * It acknowledges the messages
       */
      @Override
      public void ack(Object id) {

      }
      /**
       * It tells failed messages
       */
      @Override
      public void fail(Object id) {

      }
     /**
      * It declares the message name
      */
      @Override
      public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("SPOUT_MESSAGE"));
      }

    @Override
    public void activate() {

    }

    @Override
    public void close() {

    }

    @Override
    public void deactivate() {

    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }


}

и это класс топологии: DiagnosticTopology.java

public class DiagnosticTopology {

    public static void main(String[] args) throws Exception {
        int gSize = (null != args && args.length > 0) ? Integer.parseInt(args[0]) : 2;
        int sSize = (null != args && args.length > 1) ? Integer.parseInt(args[1]) : 128;
        int sMSize = (null != args && args.length > 2) ? Integer.parseInt(args[2]) : 16;
        int aGSize = (null != args && args.length > 3) ? Integer.parseInt(args[3]) : 16;
        int rSize = (null != args && args.length > 4) ? Integer.parseInt(args[4]) : 64;
        int rMSize = (null != args && args.length > 5) ? Integer.parseInt(args[5]) : 16;
        int dMSize = (null != args && args.length > 6) ? Integer.parseInt(args[6]) : 8;
        int wSize = (null != args && args.length > 7) ? Integer.parseInt(args[7]) : 16;
        String topologyName = (null != args && args.length > 8) ? args[8] : "DIAGNOSTIC";

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("controller", new Controller(), 1);
        builder.setBolt("generator", new GeneratorBolt(), gSize).shuffleGrouping("controller");
        builder.setBolt("scraping", new ScrapingBolt(), sSize).shuffleGrouping("generator");
        builder.setBolt("smongo", new MongoBolt(), sMSize).shuffleGrouping("scraping");
        builder.setBolt("aggregation", new AggregationBolt(), aGSize).shuffleGrouping("scraping");
        builder.setBolt("rule", new RuleBolt(), rSize).shuffleGrouping("smongo");
        builder.setBolt("rmongo", new RMongoBolt(), rMSize).shuffleGrouping("rule");
        builder.setBolt("dstatus", new DeviceStatusBolt(), dMSize).shuffleGrouping("rule");

        builder.setSpout("trigger", new TriggerSpout(), 1);
        builder.setBolt("job", new JobTriggerBolt(), 4).shuffleGrouping("trigger");

        Config conf = new Config();
        conf.setDebug(false);
        conf.setNumWorkers(wSize);

        StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
    }
}

У нас есть достаточно хорошие серверы (Xeon, 8-ядерные, 32 ГБ и флеш-накопители) для производственной и тестовой среды, и нет внешних факторов, которые могут вызвать эту проблему, поскольку обработка исключений повсюду в коде.

Когда это происходит, кажется, что все внезапно прекратилось, и нет никаких следов того, почему это произошло.

Любая помощь высоко ценится!

2 ответа

Решение

Я не знаю, в чем причина вашей проблемы, но я бы порекомендовал вам начать с проверки, решает ли обновление до последней версии Storm эту проблему. Мне известны как минимум две проблемы, связанные с тем, что рабочие потоки умирают и не возвращаются. https://issues.apache.org/jira/browse/STORM-1750 https://issues.apache.org/jira/browse/STORM-21941750 исправлен в 1.1.0, но 2194 не исправлен до 1.1.1.

Если обновление не решит проблему для вас, вы можете отладить ее, выполнив следующие действия.

В следующий раз, когда ваша топология зависнет, откройте Storm UI и найдите свой носик. Он покажет список исполнителей, управляющих этим носиком, а также рабочие, отвечающие за их выполнение. Выберите одного из рабочих, где исполнитель носа ничего не излучает. Откройте оболочку на машине, на которой работает этот рабочий, и найдите идентификатор процесса рабочей JVM. Вы можете сделать это легко с jps -m,

Пример выходных данных, показывающий рабочую JVM с портом 6701 на моей локальной машине с pid 7592:

7592 Рабочий тест-2-1520361882 d24dc55d-76c7-4cc6-93fa-2663fcdcb1ba-10.0.75.1 6701 f7b6f8e4-6c87-47ca-a7b7-655009b6c62a

Запустите дамп потока, выполнив kill -3 <pid>или используйте jstack <pid> Если вы предпочитаете.

В дампе потока вы должны быть в состоянии найти поток исполнителя, который висит. Например, когда я делаю дамп потока для топологии с носиком под названием "слово", где один из исполнителей носителей имеет номер 13, я вижу

редактирование: переполнение стека не позволит мне опубликовать трассировку стека, потому что эвристический поиск неформатированного кода плох. Я, вероятно, потратил столько же времени на публикацию трассировки стека, сколько и на написание оригинального ответа, поэтому я не могу продолжать попытки. Вот след, который должен был быть здесь https://pastebin.com/2Sz5kkQ1

который показывает мне, что исполнитель 13 в настоящее время делает. В этом случае он спит во время вызова nextTuple.

Если вы можете узнать, что делает ваш зависший исполнитель, вы должны быть гораздо лучше подготовлены к решению проблемы или сообщить об ошибке в Storm.

Мы наблюдали это в нашем приложении, где у нас был очень загруженный процессор, а все остальные потоки ожидали своей очереди. Когда мы попытались найти основную причину, используя JVisualVM для проверки использования ресурсов, мы обнаружили, что некоторые функции в некоторых болтах приводили к большим накладным расходам и времени процессора. Пожалуйста, проверьте через. любой инструмент профилирования, если в критическом пути ЦП метода nextTuple() есть заблокированные потоки или вы получаете какие-либо данные для этого из апстрима.

Другие вопросы по тегам