Apache storm Trident - создание топологий динамически
Есть ли способ динамически создавать топологии в трезубце? Может ли кто-нибудь привести примеры?
1 ответ
Прежде всего, вы также можете знать, что создание топологий не является частью Trident. Trident - это всего лишь API для микропакета.
И создание новых топологий является динамическим по определению. Это то, что TopologyBuilder
класс делает.
Таким образом, чтобы ответить на ваш вопрос, да, можно создавать новые топологии из Trident, или из простых носиков и болтов Storm. Единственное, что вам нужно, это то, что ваша логика создания топологии должна иметь доступ к кластеру Storm (классы и другие ресурсы), который снова по определению будет удовлетворен, если вы запустите свою логику в Storm.
Последнее, что вам понадобится, - это найти способ отправить вновь созданную топологию, и это то, что StormSubmitter
класс был создан для, что опять-таки (! сюрприз:)) по определениям, удовлетворяющим нахождении на вашем пути к классам, когда вы запускаете свою логику внутри Trident или обычного носика / болта.
Из любопытства, почему вы планируете это сделать? Каковы ваши требования?
Пример:
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;
public class DynamicTopologySpout implements IBatchSpout {
private static final long serialVersionUID = -3269435263455830842L;
@Override
@SuppressWarnings("rawtypes")
public void open(Map conf, TopologyContext context) {}
@Override
public void emitBatch(long batchId, TridentCollector collector) {
if (newTopologyNeeded()) {
TopologyBuilder builder = new TopologyBuilder();
builder
.setSpout("spout", new BaseRichSpout() {
private static final long serialVersionUID = 1L;
@Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
@Override public void nextTuple() {}
}, 1)
.setMaxSpoutPending(15)
.setNumTasks(1);
StormTopology topology = builder.createTopology();
Config config = new Config();
try {
StormSubmitter.submitTopology("dynamic-topology", config, topology);
} catch (Exception e) {
e.printStackTrace();
collector.reportError(e);
}
}
}
private boolean newTopologyNeeded() {
// Check if topology needed ...
return false;
}
@Override
public void ack(long batchId) {}
@Override
public void close() {}
@Override
public Map<String, Object> getComponentConfiguration() { return null; }
@Override
public Fields getOutputFields() { return null; }
}