Приведет ли возврат Mono <ServerResponse> к (злой) синхронной, блокирующей связи клиент / сервер?
Я новичок в Spring Reactor и WebFlux и немного смущен потоком событий в функциональной сети Spring. Пример: у меня есть функция-обработчик, возвращающаяMono<ServerResponse>
. Внутри негоfindAll()
метод репозитория выполняется, возвращая Flux<T>
. В соответствии с реактивным манифестом, чтобы быть асинхронным, неблокирующим и допускать обратное давление, я хотел бы видетьonNext()
для каждого элемента, возвращенного из репозитория. Однако, просматривая логи сервера во время обработки запроса, я вижу только одинonNext()
событие, что имеет смысл, поскольку мой возвращаемый тип - Mono
содержащий ответ:
Функция маршрутизатора
@Bean
public RouterFunction<ServerResponse> itemsRoute(ItemsHandler itemsHandler) {
return RouterFunctions
.route(GET(ITEMS_ENDPOINT_V2).and(accept(MediaType.APPLICATION_JSON))
, itemsHandler::getAll);
}
Функция обработчика
Mono<ServerResponse> getAll(ServerRequest request) {
return ServerResponse.ok()
.contentType(MediaType.APPLICATION_JSON)
.body(itemRepository.findAll(), Item.class)
.log("GET items");
}
Журнал событий
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onSubscribe([Fuseable] FluxMapFuseable.MapFuseableSubscriber)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | request(unbounded)
2020-05-10 15:10:51.744 INFO 19096 --- [ctor-http-nio-4] GET items : | onNext(org.springframework.web.reactive.function.server.DefaultEntityResponseBuilder$DefaultEntityResponse@83426cc)
2020-05-10 15:10:51.745 INFO 19096 --- [ctor-http-nio-4] GET items : | onComplete()
Напротив, реализация классического метода аннотированного контроллера Spring с Flux<T>
в качестве возвращаемого типа я увижу onNext()
для каждого случая T
(т.е. каждый элемент результирующего набора), который мне кажется более "правильным" (теперь клиент контролирует поток событий и т. д.):
Контроллер
@GetMapping(ITEMS_ENDPOINT_V1)
public Flux<Item> getAll() {
return itemRepository
.findAll()
.log("GET items");
}
Журнал
2020-05-10 15:14:04.135 INFO 19096 --- [ctor-http-nio-5] GET items : onSubscribe(FluxOnErrorResume.ResumeSubscriber)
2020-05-10 15:14:04.136 INFO 19096 --- [ctor-http-nio-5] GET items : request(unbounded)
2020-05-10 15:14:04.137 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=An item, price=4999.0))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=Another item, price=7249.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=Yet another item, price=2399.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=And another item, price=699.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=Aaaaaand another one, price=1.99))
2020-05-10 15:14:04.138 INFO 19096 --- [ntLoopGroup-2-5] GET items : onComplete()
Это смущает. Позвольте мне уточнить:
- С помощью
Mono<ServerResponse>
кажется злом в том смысле, что он инкапсулирует весь набор результатов в одно событие, что для меня похоже на нарушение реактивных принципов асинхронного, неблокирующего потока событий с поддержкой обратного давления. Разве это не лишает клиента контроля? Мне это кажется традиционным, блокирующим общением клиент / сервер. - Возвращение
Flux<T>
напрямую кажется намного приятнее, потому что он позволяет обрабатывать события для каждого результата и контролировать обратное давление.
Мои вопросы:
- Каковы последствия создания
Mono<ServerResponse>
? Это вызовет блокирующее синхронное взаимодействие, испускающееonNext()
только когда все элементы были прочитаны из репо? Потеряю ли я функцию противодавления и т. Д.? - Как я могу заставить серверную часть функционального стиля отправлять
onNext()
для каждого элемента в наборе результатов? - Что было бы наилучшей практикой с точки зрения типа возвращаемого значения функции обработчика функционального стиля, которая является полностью реактивной, то есть неблокирующей, асинхронной и совместимой с противодавлением? Я не уверен
Mono<ServerResponse>
не нарушает эти реактивные принципы.
Я могу ошибаться или упускать что-то важное. Спасибо за вашу помощь!
1 ответ
Все зависит от клиента, потребляющего ServerResponse
. Согласно документам WebFlux (https://docs.spring.io/spring-framework/docs/5.2.x/spring-framework-reference/web-reactive.html), настраивающие функции обработчика для возвратаMono<ServerResponse>
независимо от количества возвращенных элементов, это стандартный способ и абсолютно нормально - пока клиент правильно обрабатывает базовыйFlux<T>
все хорошо. Моя проблема возникла, потому что я тестировал конечные точки, используяcurl
, который не может обнаружить основной Flux
. Использование клиента с функциональным стилем (например,org.springframework.web.reactive.function.client.WebClient
), Mono<ServerResponse>
может быть десериализован в Flux<T>
во-первых, включив все приятные реактивные функции и сделав наш onNext()
события появляются.
Код клиента
Вызов бэкэнда таким образом, десериализация ServerResponse в поток:
@GetMapping(CLIENT_ITEMS_RESOURCE_ENDPOINT_URL)
public Flux<Item> getAllItems(@RequestParam(defaultValue = "true") boolean useRetrieve) {
return webClient.get().uri(SERVER_ITEMS_RESOURCE_V2_ENDPOINT_URL)
.retrieve()
.bodyToFlux(Item.class) // <-- de-serialize the ServerResponse into a Flux
.log("GET all items from server");
}
Приведет к просмотру всех onNext()
события, позволяющие обрабатывать события на стороне клиента:
2020-05-10 16:10:10.504 INFO 10000 --- [ctor-http-nio-2] GET all items from server : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
2020-05-10 16:10:10.504 INFO 10000 --- [ctor-http-nio-2] GET all items from server : request(unbounded)
2020-05-10 16:10:10.511 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1a, description=bla bla, price=4999.0))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1c, description=bla bla bla, price=7249.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1b, description=bla bla bla bla, price=2399.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1d, description=bla bla bla bla bla, price=699.99))
2020-05-10 16:10:10.512 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onNext(Item(id=5eb7f9461a10790e4902ac1e, description=another item, price=1.99))
2020-05-10 16:10:10.513 INFO 10000 --- [ctor-http-nio-8] GET all items from server : onComplete()
Так что все в порядке и полностью реагирует, пока происходит надлежащая обработка ответа клиентом.