Apache Storm spout прекращает излучать сообщения из носика
Мы долго боролись с этим вопросом. Короче говоря, наша топология шторма прекращает излучать сообщения из носика через некоторое время случайным образом. У нас есть автоматический сценарий, который повторно развертывает топологию каждый день в 06:00 UTC после завершения обновления основных данных.
За последние 2 недели наша топология перестала отправлять сообщения 3 раза в поздние часы UTC (с 22:00 до 02:00). Он появляется только после перезагрузки, которая происходит около 06:00 UTC.
Я искал много ответов и блогов, но не мог узнать, что здесь происходит. У нас есть топология без привязки, которую мы выбрали 3-4 года назад. Мы начали с 0.9.2 и сейчас находимся на 1.1.0.
Я проверил все виды журналов, и я на 100% уверен, что nextTuple()
Метод для контроллера не вызывается, и в системе не происходит никаких исключений, которые могут вызвать это. Я также проверил все виды журналов, которые мы накапливаем, и нет ни одного журнала ошибок или предупреждений, объясняющих внезапную остановку. Журналы INFO также не так полезны. Нет ничего, что могло бы быть связано с этой проблемой в рабочих журналах или журналах супервизора или журналах nimbus.
Вот так выглядит наш класс spout:Controller.java
public class Controller implements IRichSpout {
SpoutOutputCollector _collector;
Calendar LAST_RUN = null;
List<ControllerMessage> msgList;
/**
* It is to open the spout
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
msgList= new ArrayList<ControllerMessage>();
MongoIndexingHandler mongoIndexingHandler = new MongoIndexingHandler();
mongoIndexingHandler.createMongoIndexes();
}
/**
* It executes the next tuple
*/
@Override
public void nextTuple() {
Map<String, Object> logMap = new HashMap<>();
logMap.put("BEGIN", new Date());
try {
TriggerHandler thandler = new TriggerHandler();
if (msgList.size() == 0) {
List<ControllerMessage> mList = thandler.getControllerMessage(new Date());
msgList = mList;
}
if (msgList.size() > 0) {
ControllerMessage message = msgList.get(0);
if(thandler.fire(message.getFireTime())) {
Util.log(message, "CONTROLLER_LOGS", message.getTime(), new Date());
msgList.remove(0);
_collector.emit(new Values(message));
}
}
else{
Utils.sleep(1000);
}
} catch (Exception e) {
_collector.reportError(e);
Util.exLog(e, "EXECUTOR_ERROR", new Date(), "nextTuple()",Controller.class);
}
}
/**
* It acknowledges the messages
*/
@Override
public void ack(Object id) {
}
/**
* It tells failed messages
*/
@Override
public void fail(Object id) {
}
/**
* It declares the message name
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("SPOUT_MESSAGE"));
}
@Override
public void activate() {
}
@Override
public void close() {
}
@Override
public void deactivate() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
и это класс топологии: DiagnosticTopology.java
public class DiagnosticTopology {
public static void main(String[] args) throws Exception {
int gSize = (null != args && args.length > 0) ? Integer.parseInt(args[0]) : 2;
int sSize = (null != args && args.length > 1) ? Integer.parseInt(args[1]) : 128;
int sMSize = (null != args && args.length > 2) ? Integer.parseInt(args[2]) : 16;
int aGSize = (null != args && args.length > 3) ? Integer.parseInt(args[3]) : 16;
int rSize = (null != args && args.length > 4) ? Integer.parseInt(args[4]) : 64;
int rMSize = (null != args && args.length > 5) ? Integer.parseInt(args[5]) : 16;
int dMSize = (null != args && args.length > 6) ? Integer.parseInt(args[6]) : 8;
int wSize = (null != args && args.length > 7) ? Integer.parseInt(args[7]) : 16;
String topologyName = (null != args && args.length > 8) ? args[8] : "DIAGNOSTIC";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("controller", new Controller(), 1);
builder.setBolt("generator", new GeneratorBolt(), gSize).shuffleGrouping("controller");
builder.setBolt("scraping", new ScrapingBolt(), sSize).shuffleGrouping("generator");
builder.setBolt("smongo", new MongoBolt(), sMSize).shuffleGrouping("scraping");
builder.setBolt("aggregation", new AggregationBolt(), aGSize).shuffleGrouping("scraping");
builder.setBolt("rule", new RuleBolt(), rSize).shuffleGrouping("smongo");
builder.setBolt("rmongo", new RMongoBolt(), rMSize).shuffleGrouping("rule");
builder.setBolt("dstatus", new DeviceStatusBolt(), dMSize).shuffleGrouping("rule");
builder.setSpout("trigger", new TriggerSpout(), 1);
builder.setBolt("job", new JobTriggerBolt(), 4).shuffleGrouping("trigger");
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(wSize);
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
}
}
У нас есть достаточно хорошие серверы (Xeon, 8-ядерные, 32 ГБ и флеш-накопители) для производственной и тестовой среды, и нет внешних факторов, которые могут вызвать эту проблему, поскольку обработка исключений повсюду в коде.
Когда это происходит, кажется, что все внезапно прекратилось, и нет никаких следов того, почему это произошло.
Любая помощь высоко ценится!
2 ответа
Я не знаю, в чем причина вашей проблемы, но я бы порекомендовал вам начать с проверки, решает ли обновление до последней версии Storm эту проблему. Мне известны как минимум две проблемы, связанные с тем, что рабочие потоки умирают и не возвращаются. https://issues.apache.org/jira/browse/STORM-1750 https://issues.apache.org/jira/browse/STORM-21941750 исправлен в 1.1.0, но 2194 не исправлен до 1.1.1.
Если обновление не решит проблему для вас, вы можете отладить ее, выполнив следующие действия.
В следующий раз, когда ваша топология зависнет, откройте Storm UI и найдите свой носик. Он покажет список исполнителей, управляющих этим носиком, а также рабочие, отвечающие за их выполнение. Выберите одного из рабочих, где исполнитель носа ничего не излучает. Откройте оболочку на машине, на которой работает этот рабочий, и найдите идентификатор процесса рабочей JVM. Вы можете сделать это легко с jps -m
,
Пример выходных данных, показывающий рабочую JVM с портом 6701 на моей локальной машине с pid 7592:
7592 Рабочий тест-2-1520361882 d24dc55d-76c7-4cc6-93fa-2663fcdcb1ba-10.0.75.1 6701 f7b6f8e4-6c87-47ca-a7b7-655009b6c62a
Запустите дамп потока, выполнив kill -3 <pid>
или используйте jstack <pid>
Если вы предпочитаете.
В дампе потока вы должны быть в состоянии найти поток исполнителя, который висит. Например, когда я делаю дамп потока для топологии с носиком под названием "слово", где один из исполнителей носителей имеет номер 13, я вижу
редактирование: переполнение стека не позволит мне опубликовать трассировку стека, потому что эвристический поиск неформатированного кода плох. Я, вероятно, потратил столько же времени на публикацию трассировки стека, сколько и на написание оригинального ответа, поэтому я не могу продолжать попытки. Вот след, который должен был быть здесь https://pastebin.com/2Sz5kkQ1
который показывает мне, что исполнитель 13 в настоящее время делает. В этом случае он спит во время вызова nextTuple.
Если вы можете узнать, что делает ваш зависший исполнитель, вы должны быть гораздо лучше подготовлены к решению проблемы или сообщить об ошибке в Storm.
Мы наблюдали это в нашем приложении, где у нас был очень загруженный процессор, а все остальные потоки ожидали своей очереди. Когда мы попытались найти основную причину, используя JVisualVM для проверки использования ресурсов, мы обнаружили, что некоторые функции в некоторых болтах приводили к большим накладным расходам и времени процессора. Пожалуйста, проверьте через. любой инструмент профилирования, если в критическом пути ЦП метода nextTuple() есть заблокированные потоки или вы получаете какие-либо данные для этого из апстрима.