Apache storm Trident - создание топологий динамически

Есть ли способ динамически создавать топологии в трезубце? Может ли кто-нибудь привести примеры?

1 ответ

Прежде всего, вы также можете знать, что создание топологий не является частью Trident. Trident - это всего лишь API для микропакета.

И создание новых топологий является динамическим по определению. Это то, что TopologyBuilder класс делает.

Таким образом, чтобы ответить на ваш вопрос, да, можно создавать новые топологии из Trident, или из простых носиков и болтов Storm. Единственное, что вам нужно, это то, что ваша логика создания топологии должна иметь доступ к кластеру Storm (классы и другие ресурсы), который снова по определению будет удовлетворен, если вы запустите свою логику в Storm.

Последнее, что вам понадобится, - это найти способ отправить вновь созданную топологию, и это то, что StormSubmitter класс был создан для, что опять-таки (! сюрприз:)) по определениям, удовлетворяющим нахождении на вашем пути к классам, когда вы запускаете свою логику внутри Trident или обычного носика / болта.

Из любопытства, почему вы планируете это сделать? Каковы ваши требования?

Пример:

import java.util.Map;

import org.apache.storm.Config;
import org.apache.storm.StormSubmitter;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.trident.operation.TridentCollector;
import org.apache.storm.trident.spout.IBatchSpout;
import org.apache.storm.tuple.Fields;

public class DynamicTopologySpout implements IBatchSpout {

    private static final long serialVersionUID = -3269435263455830842L;

    @Override
    @SuppressWarnings("rawtypes")
    public void open(Map conf, TopologyContext context) {}

    @Override
    public void emitBatch(long batchId, TridentCollector collector) {
        if (newTopologyNeeded()) {
            TopologyBuilder builder = new TopologyBuilder();
            builder
            .setSpout("spout", new BaseRichSpout() {
                private static final long serialVersionUID = 1L;
                @Override public void declareOutputFields(OutputFieldsDeclarer declarer) {}
                @Override @SuppressWarnings("rawtypes") public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {}
                @Override public void nextTuple() {}
            }, 1)
            .setMaxSpoutPending(15)
            .setNumTasks(1);
            StormTopology topology = builder.createTopology();
            Config config = new Config();
            try {
                StormSubmitter.submitTopology("dynamic-topology", config, topology);
            } catch (Exception e) {
                e.printStackTrace();
                collector.reportError(e);
            }
        }
    }

    private boolean newTopologyNeeded() {
        // Check if topology needed ...
        return false;
    }

    @Override
    public void ack(long batchId) {}

    @Override
    public void close() {}

    @Override
    public Map<String, Object> getComponentConfiguration() { return null; }

    @Override
    public Fields getOutputFields() { return null; }

}
Другие вопросы по тегам