vert.x Ожидание ответа на несколько сообщений
В vert.x я могу отправить сообщение в другую статью и "асинхронно ждать" ответа.
Проблема в том, что я хочу отправлять сообщения нескольким статьям и сделать асинхронный обработчик, который будет вызываться при ответе на все статьи.
Это возможно или есть лучший дизайн для достижения этой функциональности?
РЕДАКТИРОВАТЬ:
Предположим, у меня есть вертикаль A, которая отправляет сообщения вершинам B,C и D. Каждая вертикаль (B,C,D) что-то делает с сообщением и возвращает A некоторые данные. Затем вертикаль A получает ответ от B,C, D и что-то делает со всеми данными. Проблема в том, что у меня есть обработчик для каждого отправляемого сообщения (один для A, один для B, один для C), я хочу, чтобы один обработчик вызывался при получении всех ответов.
2 ответа
Начиная с Vert.x 3.2, документация объясняет, как координировать асинхронно, используя Future
а также CompositeFuture
,
Допустим, вы хотите сделать два send
вызывает по шине событий и делает что-то, когда оба преуспевают:
Future<Message> f1 = Future.future();
eventBus.send("first.address", "first message", f1.completer());
Future<Message> f2 = Future.future();
eventBus.send("second.address", "second message", f2.completer());
CompositeFuture.all(f1, f2).setHandler(result -> {
// business as usual
});
В качестве аргументов может быть передано до 6 фьючерсов или, в качестве альтернативы, они могут быть переданы в виде списка.
Наилучшим подходом для этого является использование Reactive Extensions, как это реализовано в Rf.Java от Netflix и предложено модулем RxVertx.
Огромное разнообразие операторов позволяет вам делать такие вещи, как "архивирование" результатов нескольких асинхронных вызовов в новый результат и делать с ним все, что вы захотите.
У меня есть простая демоверсия, доступная на GitHub, которая содержит:
final Observable<JsonObject> meters = observeMetricsSource(metricsAddress, METERS_BUS_REQUEST, "meters", rx);
final Observable<JsonObject> histograms = observeMetricsSource(metricsAddress, HISTOGRAMS_BUS_REQUEST, "histograms", rx);
subscribeAndRespondJson(zip(meters, histograms, (jo1, jo2) -> jo1.mergeIn(jo2)), req);
Этот фрагмент показывает, как две наблюдаемые из двух асинхронных взаимодействий шины событий "упаковываются" (т. Е. Объединяются) в один окончательный HTTP-ответ.