Неблокирующая потоковая передача данных между двумя сервисами Quarkus (Vert.x с Mutiny в Java)
Обновить!
Я исправил мелкие ошибки в примере кода после решения некоторых проблем, которые не имели отношения к основному вопросу, который все еще касается неблокирующей потоковой передачи между сервисами.
Справочная информация:
Я портирую службу Spring WebFlux под Quarkus. Служба выполняет длительный поиск по множеству огромных наборов данных и возвращает частичные результаты в потоке (текст / поток событий), когда они становятся доступными.
Проблема:
Прямо сейчас я пытаюсь использовать Mutiny Multi с Vert.x под Quarkus, но не могу понять, как служба потребителей может получать этот поток без блокировки.
Во всех примерах потребителем является либо интерфейсная страница JS, либо тип контента производителя - application/json, который, кажется, тупит, пока Multi не завершит работу, прежде чем отправить его в одном объекте JSON (что не имеет смысла в моем приложении).
Вопросы:
- Как получить текст / поток событий с помощью Vert.x WebClient со вкусом Mutiny?
- Если проблема будет в том, что WebClient не может получать непрерывные потоки: каков стандартный способ потоковой передачи данных между двумя службами Quarkus?
Вот упрощенный пример
Тестовый объект
public class SearchResult implements Serializable {
private String content;
public SearchResult(String content) {
this.content = content;
}
//.. toString, getters and setters
}
Продюсер 1. простой бесконечный поток -> зависает
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2) .onItem().transform(n -> new SearchResult(n.toString()));
}
Producer 2. с бесконечным потоком Vertx Paths -> зависает
@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
log.info("routed run");
return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.onItem().transform(n -> new SearchResult(n.toString()));
}
Производитель 3. простой конечный поток -> блоки до завершения
@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
.transform().byTakingFirstItems(5)
.onItem().transform(n -> new SearchResult(n.toString()));
}
Потребитель:
Пробовал несколько разных решений как на стороне производителя, так и на стороне потребителя, но в каждом случае поток блокируется до завершения или зависает на неопределенный срок без передачи данных для бесконечных потоков. Я получаю те же результаты с httpie. Вот последняя итерация:
WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
client.get("/string")
.send()
.onFailure().invoke(resp -> log.error("error: " + resp))
.onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
.toMulti()
.subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));
1 ответ
Веб-клиент Vert.x не работает с SSE (без конфигурации). С https://vertx.io/docs/vertx-web-client/java/:
Ответы полностью буферизованы, используйте BodyCodec.pipe для передачи ответа в поток записи.
Он ждет, пока не завершится ответ. Вы можете использовать необработанный HTTP-клиент Vert.x или использоватьpipe
кодек. Примеры приведены на https://vertx.io/docs/vertx-web-client/java/.
В качестве альтернативы вы можете использовать клиент SSE, например: https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java