Приведет ли возврат Mono <ServerResponse> к (злой) синхронной, блокирующей связи клиент / сервер?

Я новичок в Spring Reactor и WebFlux и немного смущен потоком событий в функциональной сети Spring. Пример: у меня есть функция-обработчик, возвращающаяMono<ServerResponse>. Внутри негоfindAll() метод репозитория выполняется, возвращая Flux<T>. В соответствии с реактивным манифестом, чтобы быть асинхронным, неблокирующим и допускать обратное давление, я хотел бы видетьonNext()для каждого элемента, возвращенного из репозитория. Однако, просматривая логи сервера во время обработки запроса, я вижу только одинonNext() событие, что имеет смысл, поскольку мой возвращаемый тип - Mono содержащий ответ:

Функция маршрутизатора

@Bean
 public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
     return RouterFunctions
             .route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
                     , itemsHandler::getAll);
}

Функция обработчика

Mono<ServerResponse> getAll(ServerRequest request) {
    return ServerResponse.ok()
            .contentType(MediaType.APPLICATION_JSON)
            .body(itemRepository.findAll(), Item.class)
            .log("GET items");
}

Журнал событий

2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | request(unbounded)
2020-05-10 15:10:51.744  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745  INFO 19096 --- [ctor-http-nio-4] GET items                                : | onComplete()

Напротив, реализация классического метода аннотированного контроллера Spring с Flux<T> в качестве возвращаемого типа я увижу onNext() для каждого случая T (т.е. каждый элемент результирующего набора), который мне кажется более "правильным" (теперь клиент контролирует поток событий и т. д.):

Контроллер

@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
    return itemRepository
            .findAll()
            .log("GET items");
}

Журнал

2020-05-10 15:14:04.135  INFO 19096 --- [ctor-http-nio-5] GET items                                : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136  INFO 19096 --- [ctor-http-nio-5] GET items                                : request(unbounded)
2020-05-10 15:14:04.137  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138  INFO 19096 --- [ntLoopGroup-2-5] GET items                                : onComplete()

Это смущает. Позвольте мне уточнить:

  • С помощью Mono<ServerResponse>кажется злом в том смысле, что он инкапсулирует весь набор результатов в одно событие, что для меня похоже на нарушение реактивных принципов асинхронного, неблокирующего потока событий с поддержкой обратного давления. Разве это не лишает клиента контроля? Мне это кажется традиционным, блокирующим общением клиент / сервер.
  • Возвращение Flux<T> напрямую кажется намного приятнее, потому что он позволяет обрабатывать события для каждого результата и контролировать обратное давление.

Мои вопросы:

  • Каковы последствия создания Mono<ServerResponse>? Это вызовет блокирующее синхронное взаимодействие, испускающееonNext()только когда все элементы были прочитаны из репо? Потеряю ли я функцию противодавления и т. Д.?
  • Как я могу заставить серверную часть функционального стиля отправлять onNext() для каждого элемента в наборе результатов?
  • Что было бы наилучшей практикой с точки зрения типа возвращаемого значения функции обработчика функционального стиля, которая является полностью реактивной, то есть неблокирующей, асинхронной и совместимой с противодавлением? Я не уверенMono<ServerResponse> не нарушает эти реактивные принципы.

Я могу ошибаться или упускать что-то важное. Спасибо за вашу помощь!

1 ответ

Решение

Все зависит от клиента, потребляющего ServerResponse. Согласно документам WebFlux (https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html), настраивающие функции обработчика для возвратаMono<ServerResponse>независимо от количества возвращенных элементов, это стандартный способ и абсолютно нормально - пока клиент правильно обрабатывает базовыйFlux<T>все хорошо. Моя проблема возникла, потому что я тестировал конечные точки, используяcurl, который не может обнаружить основной Flux. Использование клиента с функциональным стилем (например,org.springframework.web.reactive.function.client.WebClient), Mono<ServerResponse> может быть десериализован в Flux<T> во-первых, включив все приятные реактивные функции и сделав наш onNext() события появляются.

Код клиента

Вызов бэкэнда таким образом, десериализация ServerResponse в поток:

@GetMapping(CLIENT_ITEMS_RESOURCE_ENDPOINT_URL)
public Flux<Item> getAllItems(@RequestParam(defaultValue = "true") boolean useRetrieve) {
    return  webClient.get().uri(SERVER_ITEMS_RESOURCE_V2_ENDPOINT_URL)
            .retrieve()
            .bodyToFlux(Item.class) // <-- de-serialize the ServerResponse into a Flux
            .log("GET all items from server");
}

Приведет к просмотру всех onNext() события, позволяющие обрабатывать события на стороне клиента:

2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2020-05-10 16:10:10.504  INFO 10000 --- [ctor-http-nio-2] GET all items from server   : request(unbounded)
2020-05-10 16:10:10.511  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=bla bla, price=4999.0))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=bla bla bla, price=7249.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=bla bla bla bla, price=2399.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=bla bla bla bla bla, price=699.99))
2020-05-10 16:10:10.512  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=another item, price=1.99))
2020-05-10 16:10:10.513  INFO 10000 --- [ctor-http-nio-8] GET all items from server   : onComplete()

Так что все в порядке и полностью реагирует, пока происходит надлежащая обработка ответа клиентом.

Другие вопросы по тегам