Что такое состояние трезубца в шторме?
Я новичок в Trident в шторме. Я ломаю голову над TridentState. Насколько я понимаю, трезубец поддерживает состояние (т. Е. Метаданные) для каждого пакета (независимо от того, полностью ли обработаны все кортежи в пакете с сохранением идентификатора транзакции в базе данных), и я не совсем уверен, что делает следующий оператор
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
Может кто-нибудь объяснить, что на самом деле происходит, когда мы определяем приведенный выше код?
2 ответа
Я надеюсь, что никогда не поздно ответить, по крайней мере, кто-то еще может найти мой ответ полезным:)
Так, topology.newStaticState()
является абстракцией Trident для запрашиваемого хранилища данных Параметр для newStaticState()
должна быть реализация - на основе контракта метода - storm.trident.state.StateFactory
, Завод, в свою очередь, должен реализовать makeState()
метод, возвращающий экземпляр storm.trident.state.State
, Однако, если вы планируете запросить свое состояние, вы должны вернуть storm.trident.state.map.ReadOnlyMapState
вместо этого, так как обычный storm.trident.state.State
не имеет методов для запроса фактического источника данных (вы фактически получите исключение приведения класса, если попытаетесь использовать что-либо кроме ReadOnlyMapState
).
Итак, давайте попробуем!
Фиктивная реализация состояния:
public static class ExampleStaticState implements ReadOnlyMapState<String> {
private final Map<String, String> dataSourceStub;
public ExampleStaticState() {
dataSourceStub = new HashMap<>();
dataSourceStub.put("tuple-00", "Trident");
dataSourceStub.put("tuple-01", "definitely");
dataSourceStub.put("tuple-02", "lacks");
dataSourceStub.put("tuple-03", "documentation");
}
@Override
public List<String> multiGet(List<List<Object>> keys) {
System.out.println("DEBUG: MultiGet, keys is " + keys);
List<String> result = new ArrayList<>();
for (List<Object> inputTuple : keys) {
result.add(dataSourceStub.get(inputTuple.get(0)));
}
return result;
}
@Override
public void beginCommit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Begin commit, txid=" + txid);
}
@Override
public void commit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Commit, txid=" + txid);
}
}
Завод:
public static class ExampleStaticStateFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new ExampleStaticState();
}
}
Просто psvm
(ака public static void main
):
public static void main(String... args) {
TridentTopology tridentTopology = new TridentTopology();
FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
"foo"
}));
TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
tridentTopology
.newStream("spout", spout)
.stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
.each(new Fields("foo", "bar"), new Debug())
;
Config conf = new Config();
conf.setNumWorkers(6);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());
spout.feed(Arrays.asList(new Values[]{
new Values("tuple-00"),
new Values("tuple-01"),
new Values("tuple-02"),
new Values("tuple-03")
}));
localCluster.shutdown();
}
И, наконец, вывод:
DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]
Видите ли, stateQuery() получает значения из входного пакета и сопоставляет их со значениями, найденными в "хранилище данных".
Погружаясь немного глубже, вы можете взглянуть на источник MapGet
класс (парень, чей экземпляр используется для запросов внутри топологии) и найти там следующее:
public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
@Override
public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
return map.multiGet((List) keys);
}
@Override
public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
collector.emit(new Values(result));
}
}
Так что под капотом это просто называет multiGet()
метод вашего ReadOnlyMapState
реализации, а затем выдает значения, найденные в хранилище данных, добавляя их к уже существующему кортежу. Вы можете (хотя это может быть не лучшим решением) создать свою собственную реализацию BaseQueryFunction<ReadOnlyMapState, Object>
делать что-то более сложное.
Есть хорошая документация о состоянии Трайдента в штормовой вики. Простой ответ на ваш вопрос заключается в том, что urlToTweeters
является объектом состояния, к которому можно обратиться Я предполагаю, что приведенное выше утверждение взято из учебника по трезубцу, воспроизведенного ниже:
TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState());
topology.newDRPCStream("reach")
.stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
/* At this point we have the tweeters for each url passed in args */
.shuffle()
.stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
.parallelismHint(200)
.each(new Fields("followers"), new ExpandList(), new Fields("follower"))
.groupBy(new Fields("follower"))
.aggregate(new One(), new Fields("one"))
.parallelismHint(20)
.aggregate(new Count(), new Fields("reach"));
В этом примере urlToTweeters
будет хранить сопоставление URL для твиттеров и DRPC reach
запрос, определенный в следующей строке (который принимает URL в качестве аргументов), в конечном итоге даст охват. Но по пути (отмеченный комментарием в строке) вы увидите поток твитеров каждого URL, т. Е. Результат запроса на urlToTweeters
,