Чтобы передать значение в запрос DRPC из выходного коллектора?

Я пытаюсь реализовать Trident+DRPC. Я разработал топологию так, чтобы она не работала бесконечно. У меня есть два отдельных класса, один для реализации носика, а другой для реализации DRPC и Trident. Мой класс носика (носик, который расширяет IRichSpout) испускает идентификатор клиента. т.е.

public class TriSpout implements IRichSpout{
  //some logic here
    spoutOutputCollector.emit(new Values(id))
  }

Теперь я получил значения из выходного коллектора в другом классе, который реализует Trident с DRPC.

public class TriDrpc{

    .....
    TriSpout spout=new TriSpout1();        
    TridentTopology topology = new TridentTopology();  
    TridentState wordCounts =
          topology.newStream("spout1",spout)
            .parallelismHint(1)
            .each(new Fields("id"), new Compute(), new Fields("value"))
            .persistentAggregate(new MemoryMapState.Factory(),
                                 new Count(), new Fields("count"))   

и определение топологии drpc выглядит следующим образом

topology.newDRPCStream("Calc", drpc)
         .each(new Fields("args"), new Split(), new Fields("word"))                
         .stateQuery(wordCounts, new Fields("word"), new MapGet(), new Fields("count"));         

Запрос DRPC выглядит следующим образом

public static void main(String[] args) throws Exception {
    Config conf = new Config(); 

    if (args.length == 0) {
    LocalDRPC drpc = new LocalDRPC();
    LocalCluster cluster = new LocalCluster();
    cluster.submitTopology("Calculator", conf,   buildTopology(drpc));          
    System.out.println("DRPC RESULT: "
                + drpc.execute("Calc", "id"));
    Thread.sleep(1000);

    } else {
        conf.setNumWorkers(8);
        StormSubmitter.submitTopology(args[0], conf, buildTopology(null));
    }
}

Теперь в приведенном выше коде, в запросе DRPC, т.е.

System.out.println("DRPC RESULT: " + drpc.execute("Calc", "id"));

"id" должен совпадать с идентификатором, выдаваемым носиком, т.е. я хочу знать, какой клиент имеет активную учетную запись с использованием этого идентификатора, поэтому мне нужно отправить запрос DRPC для всех идентификаторов, излучаемых носиком. Теперь DRPC находится в основном классе, как я могу передать значение, испускаемое носиком, в Запрос DRPC без указания идентификатора вручную?

Может кто-нибудь помочь, пожалуйста

Отредактировано с новой информацией

1 ответ

Обновить

Ну, теперь понятно, в чем твоя проблема, спасибо.

Таким образом, вам нужно обрабатывать запросы DRPC для тех же идентификаторов, которые испускает тот же самый излив топологии DRPC.

Единственный способ сделать это - сохранить идентификаторы, которые вы излучаете, из своего источника в внешнее постоянное хранилище Storm (например, RDMS или распределенную хэш-карту).

Таким образом, после того, как вы передадите свою топологию для выполнения в кластере Storm, вы можете опрашивать ваше постоянное хранилище на предмет новых идентификаторов и выполнять запрос DRPC для каждого нового идентификатора.

Оригинальный ответ

Я не думаю, что понимаю вопрос. Вы пытаетесь выполнить запросы Storm DRPC с аргументом идентификатора запроса, взятым из выходного сигнала той же топологии DRPC? Я не думаю, что это эффективное и намеренное использование для топологии DRPC. Вы могли бы лучше пойти с обычной топологией.

Топологии DRPC предназначены для конечных вычислений, в то время как обычные топологии используются для непрерывных вычислений. Вызов DRPC принимает имя топологии DRPC и набор входных аргументов для вычисления результата вызова DRPC. Обычные топологии Storm (или Trident) просто работают неопределенно долго, вычисляя какой-то результат и сохраняя его.

Надеюсь, это поможет. Если нет, пожалуйста, переформулируйте свой вопрос лучше, так как не совсем понятно, в чем ваша проблема.

Другие вопросы по тегам