Неблокирующая потоковая передача данных между двумя сервисами Quarkus (Vert.x с Mutiny в Java)

Обновить!

Я исправил мелкие ошибки в примере кода после решения некоторых проблем, которые не имели отношения к основному вопросу, который все еще касается неблокирующей потоковой передачи между сервисами.

Справочная информация:

Я портирую службу Spring WebFlux под Quarkus. Служба выполняет длительный поиск по множеству огромных наборов данных и возвращает частичные результаты в потоке (текст / поток событий), когда они становятся доступными.

Проблема:

Прямо сейчас я пытаюсь использовать Mutiny Multi с Vert.x под Quarkus, но не могу понять, как служба потребителей может получать этот поток без блокировки.

Во всех примерах потребителем является либо интерфейсная страница JS, либо тип контента производителя - application/json, который, кажется, тупит, пока Multi не завершит работу, прежде чем отправить его в одном объекте JSON (что не имеет смысла в моем приложении).

Вопросы:

  1. Как получить текст / поток событий с помощью Vert.x WebClient со вкусом Mutiny?
  2. Если проблема будет в том, что WebClient не может получать непрерывные потоки: каков стандартный способ потоковой передачи данных между двумя службами Quarkus?

Вот упрощенный пример

Тестовый объект

public class SearchResult implements Serializable {

    private String content;

    public SearchResult(String content) {
        this.content = content;
    }


    //.. toString, getters and setters
    
}

Продюсер 1. простой бесконечный поток -> зависает

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
        return Multi.createFrom().ticks().every(Duration.ofSeconds(2)              .onItem().transform(n -> new SearchResult(n.toString()));
}

Producer 2. с бесконечным потоком Vertx Paths -> зависает

@Route(path = "/routed", methods = HttpMethod.GET)
public Multi<SearchResult> getSrStreamRouted(RoutingContext context) {
        log.info("routed run");
        return ReactiveRoutes.asEventStream(Multi.createFrom().ticks().every(Duration.ofSeconds(2))
                .onItem().transform(n -> new SearchResult(n.toString()));
}

Производитель 3. простой конечный поток -> блоки до завершения

@GET
@Path("/search")
@Produces(MediaType.SERVER_SENT_EVENTS)
@SseElementType(MediaType.APPLICATION_JSON)
public Multi<SearchResult> getResults() {
    return Multi.createFrom().ticks().every(Duration.ofSeconds(2))
        .transform().byTakingFirstItems(5)
        .onItem().transform(n -> new SearchResult(n.toString()));
}

Потребитель:

Пробовал несколько разных решений как на стороне производителя, так и на стороне потребителя, но в каждом случае поток блокируется до завершения или зависает на неопределенный срок без передачи данных для бесконечных потоков. Я получаю те же результаты с httpie. Вот последняя итерация:

WebClientOptions webClientOptions = new WebClientOptions().setDefaultHost("localhost").setDefaultPort(8182);
WebClient client = WebClient.create(vertx, webClientOptions);
        
client.get("/string")
                .send()
                .onFailure().invoke(resp -> log.error("error: " + resp))
                .onItem().invoke(resp -> log.info("result: " + resp.statusCode()))
                .toMulti()
                .subscribe().with(r -> log.info(String.format("Subscribe: code:%d body:%s",r.statusCode(), r.bodyAsString())));

1 ответ

Решение

Веб-клиент Vert.x не работает с SSE (без конфигурации). С https://vertx.io/docs/vertx-web-client/java/:

Ответы полностью буферизованы, используйте BodyCodec.pipe для передачи ответа в поток записи.

Он ждет, пока не завершится ответ. Вы можете использовать необработанный HTTP-клиент Vert.x или использоватьpipeкодек. Примеры приведены на https://vertx.io/docs/vertx-web-client/java/.

В качестве альтернативы вы можете использовать клиент SSE, например: https://github.com/quarkusio/quarkus-quickstarts/blob/master/kafka-quickstart/src/test/java/org/acme/kafka/PriceResourceTest.java

Другие вопросы по тегам