Чтобы передать значение в запрос DRPC из выходного коллектора?
Я пытаюсь реализовать Trident+DRPC. Я разработал топологию так, чтобы она не работала бесконечно. У меня есть два отдельных класса, один для реализации носика, а другой для реализации DRPC и Trident. Мой класс носика (носик, который расширяет IRichSpout) испускает идентификатор клиента. т.е.
public class TriSpout implements IRichSpout{
//some logic here
spoutOutputCollector.emit(new Values(id))
}
Теперь я получил значения из выходного коллектора в другом классе, который реализует Trident с DRPC.
public class TriDrpc{
.....
TriSpout spout=new TriSpout1();
TridentTopology topology = new TridentTopology();
TridentState wordCounts =
topology.newStream("spout1",spout)
.parallelismHint(1)
.each(new Fields("id"), new Compute(), new Fields("value"))
.persistentAggregate(new MemoryMapState.Factory(),
new Count(), new Fields("count"))
и определение топологии drpc выглядит следующим образом
topology.newDRPCStream("Calc", drpc)
.each(new Fields("args"), new Split(), new Fields("word"))
.stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"));
Запрос DRPC выглядит следующим образом
public static void main(String[] args) throws Exception {
Config conf = new Config();
if (args.length == 0) {
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("Calculator", conf, buildTopology(drpc));
System.out.println("DRPC RESULT: "
+ drpc.execute("Calc", "id"));
Thread.sleep(1000);
} else {
conf.setNumWorkers(8);
StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
}
}
Теперь в приведенном выше коде, в запросе DRPC, т.е.
System.out.println("DRPC RESULT: " + drpc.execute("Calc", "id"));
"id"
должен совпадать с идентификатором, выдаваемым носиком, т.е. я хочу знать, какой клиент имеет активную учетную запись с использованием этого идентификатора, поэтому мне нужно отправить запрос DRPC для всех идентификаторов, излучаемых носиком. Теперь DRPC находится в основном классе, как я могу передать значение, испускаемое носиком, в Запрос DRPC без указания идентификатора вручную?
Может кто-нибудь помочь, пожалуйста
Отредактировано с новой информацией
1 ответ
Обновить
Ну, теперь понятно, в чем твоя проблема, спасибо.
Таким образом, вам нужно обрабатывать запросы DRPC для тех же идентификаторов, которые испускает тот же самый излив топологии DRPC.
Единственный способ сделать это - сохранить идентификаторы, которые вы излучаете, из своего источника в внешнее постоянное хранилище Storm (например, RDMS или распределенную хэш-карту).
Таким образом, после того, как вы передадите свою топологию для выполнения в кластере Storm, вы можете опрашивать ваше постоянное хранилище на предмет новых идентификаторов и выполнять запрос DRPC для каждого нового идентификатора.
Оригинальный ответ
Я не думаю, что понимаю вопрос. Вы пытаетесь выполнить запросы Storm DRPC с аргументом идентификатора запроса, взятым из выходного сигнала той же топологии DRPC? Я не думаю, что это эффективное и намеренное использование для топологии DRPC. Вы могли бы лучше пойти с обычной топологией.
Топологии DRPC предназначены для конечных вычислений, в то время как обычные топологии используются для непрерывных вычислений. Вызов DRPC принимает имя топологии DRPC и набор входных аргументов для вычисления результата вызова DRPC. Обычные топологии Storm (или Trident) просто работают неопределенно долго, вычисляя какой-то результат и сохраняя его.
Надеюсь, это поможет. Если нет, пожалуйста, переформулируйте свой вопрос лучше, так как не совсем понятно, в чем ваша проблема.