Излучение в несколько потоков в Storm Trident
Как я могу излучать несколько потоков из одного болта в Storm Trident?
У меня есть болт, который делает некоторые вычисления и на основе результата я хочу передать некоторые значения в один поток, а некоторые другие значения в другой поток.
В Storm (не Trident) мы могли бы добиться этого следующим образом:
Разделите поток на несколько потоков:
@Override
public void declareOutputFields(final OutputFieldsDeclarer outputFieldsDeclarer) {
outputFieldsDeclarer.declareStream("type1-stream", new Fields("type1"));
outputFieldsDeclarer.declareStream("type2-stream", new Fields("type2"));
outputFieldsDeclarer.declareStream("error-stream", new Fields("error"));
}
Затем выбросить на основе результатов, как:
collector.emit("type1-stream", new Values("type 1 data"));
collector.emit("type2-stream", new Values("type 2 data"));
collector.emit("error-stream", new Values("error data"));
Затем выполните остальную работу, прослушивая ожидаемый поток:
builder.setBolt("errorBolt", errorBolt).shuffleGrouping("errorBoltStream", "error-stream");
builder.setBolt("type1Bolt", type1Bolt).shuffleGrouping("type1BoltStream", "type1-stream");
Итак, как я могу добиться того же поведения с помощью Storm Trident?
Одним из вариантов является вызов "each" для одного и того же потока и запуск одного и того же болта и его выдача только на основе того, что я хочу излучать в этот поток, или другой вариант - передача пары ключ-значение и фильтрация потока на основе ключа (например, type1, type2, error и т. д.) и снова создайте несколько потоков. Но ни один из них не кажется мне хорошим дизайном. Каков был бы лучший способ достичь этого?
1 ответ
AFAIK, ты не можешь этого сделать. Чтобы разделить поток, вам нужно будет сделать следующее:
// main stream
Stream stream = topology.each(...)
// stream 01
Stream stream1 = stream.each(...)
// stream 02
Stream stream2 = stream.each(...)