Создание источника Apache Storm, который генерирует кортежи каждые X секунд
У меня есть топология, которая получает данные от брокера MQTT, и я хочу, чтобы носик вел себя так:
Создайте пакет кортежей (или список строк в одном кортеже) каждые x секунд. Как мне этого добиться? Я читал немного о штормовой трезубец, но его
IBatchSpout
кажется, не позволяет мне выпускать кортежи в пакетном режиме с определенным интервалом времени.Что должен делать носик, если нет новых данных? Он не может заблокировать поток, так как это основной поток Storm, верно?
2 ответа
Вы могли бы реализовать свой собственный носик MQTT. Для примера взгляните на MongoSpout.
Важной частью является nextTuple
метод.
Когда вызывается этот метод, Storm запрашивает, чтобы Spout испустил кортежи в выходной коллектор.Этот метод должен быть неблокирующим, поэтому, если у Spout нет кортежей, этот метод должен возвращаться. nextTuple, ack и fail все вызываются в узком цикле в одном потоке задачи spout. Когда нет кортежей, которые нужно выдавать, будет целесообразно использовать nextTuple в течение короткого промежутка времени (например, одну миллисекунду), чтобы не тратить слишком много ресурсов процессора.
Вы не должны ждать указанное время сразу, но вы могли бы реализоватьnextTuple
так что он издает только кортеж время от времени.
private static final EMISSION_PERIOD = 2000; // 2 seconds
private long lastEmission;
@Override
public void nextTuple() {
if (lastEmission == null ||
lastEmission + EMISSION_PERIOD >= System.currentMillis()) {
List<Object> tuple = pollMQTT();
if (tuple != null) {
this.collector.emit(tuple);
return;
}
}
Utils.sleep(50);
}
Обратите внимание, что я нашел носик с открытым исходным кодом MQTT. Он не выглядит готовым к производству, но вы можете использовать его в качестве отправной точки.
В дополнение к Кристиану я нашел эту реализацию для клиента Storm MQTT. Предыдущая ссылка до сих пор не разработана.