Spring webflux "Разрешено только одному соединению получать подписчика", если ответ сервера от switchIfEmpty
Я хотел бы поставить случай, когда, если объект существует, то отправить ошибку, если нет, то создать нового пользователя.
вот мой обработчик:
public Mono<ServerResponse> createUser(ServerRequest request) {
Mono<UserBO> userBOMono = request.bodyToMono(UserBO.class);
Mono<String> email = userBOMono.map(UserBO::getEmail);
Mono<User> userMono = email.flatMap(userRepository::findByEmail);
return userMono.flatMap(user -> {
Mono<ErrorResponse> errorResponseMono = errorHanlder.handleEmailAlreadyExist();
return ServerResponse.status(HttpStatus.CONFLICT)
.contentType(MediaType.APPLICATION_JSON)
.body(errorResponseMono, ErrorResponse.class);
}).switchIfEmpty(Mono.defer(() -> {
Mono<User> newUserMono = userBOMono.flatMap(userMapping::mapUserBOToUser);
Mono<User> dbUserMono = newUserMono.flatMap(userRepository::save);
return ServerResponse.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.body(dbUserMono, User.class);
}));
если Mono не пустой, то его возвращаемый конфликт - то, что я хочу, если, если пусто, то создайте новый, но выдает ошибку ниже:
java.lang.IllegalStateException: Only one connection receive subscriber allowed.
at reactor.ipc.netty.channel.FluxReceive.startReceiver(FluxReceive.java:276) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
at reactor.ipc.netty.channel.FluxReceive.lambda$subscribe$2(FluxReceive.java:127) ~[reactor-netty-0.7.8.RELEASE.jar:0.7.8.RELEASE]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:163) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:404) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464) ~[netty-transport-4.1.27.Final.jar:4.1.27.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:884) ~[netty-common-4.1.27.Final.jar:4.1.27.Final]
at java.lang.Thread.run(Thread.java:748) ~[na:1.8.0_131]
Примечание об обновлении: его правильное поведение согласно определению метода:
switchIfEmpty(Mono<? extends T> alternate)
Fallback to an alternative Mono if this mono is completed without data
Значит, когда я отправляю пустой моно в кузове, его работа в порядке:
return ServerResponse.status(HttpStatus.CREATED)
.contentType(MediaType.APPLICATION_JSON)
.body(Mono.empty(), User.class);
Итак, каково решение для обработки swtichIfEmpty, если я хотел бы отправить объект Mono в качестве возврата из него.
3 ответа
Наконец я смог ее решить, я дважды читал поток userBOMono, который вызывал эту ошибку через webflux.
так вот обновленный код, который работает нормально.
public Mono<ServerResponse> createUser(ServerRequest request) {
Mono<UserBO> userBOMono = request.bodyToMono(UserBO.class);
return userBOMono.flatMap(userBO -> {
String email = userBO.getEmail();
Mono<User> userMono = userRepository.findByEmail(email);
return userMono.flatMap(user -> errorHandler.handleEmailAlreadyExist())
.switchIfEmpty(Mono.defer(() -> createNewUser(userBO)));
});
}
private Mono<ServerResponse> createNewUser(UserBO userBO) {
Mono<User> userMono = Mono.just(userBO).flatMap(userMapping::mapUserBOToUser).flatMap(userRepository::save);
return ServerResponse.ok().contentType(MediaType.APPLICATION_JSON)
.body(userMono, User.class);
}
Можете ли вы также поделиться своим клиентским кодом? Я предполагаю, что вы используете WebClient для вызова этого API. Клиент не должен подписываться более одного раза, иначе может появиться эта ошибка.
У меня такая же ошибка при запуске класса @SpringBootTest .
Проблема, похоже, в том, что ответ писался, когда методы уже были закрыты.
Решено путем передачи " Mono.empty() " вместо полного ответа.
Код до:
WebClient.create()
.get()
.uri(new URI(UPDATE_COMPANIES_URL))
.exchangeToMono(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
return response.bodyToMono(Boolean.class).thenReturn(Boolean.TRUE);
} else {
System.out.println("[sendSecureRequest] Error sending request: " + response.statusCode());
return response.bodyToMono(Boolean.class).thenReturn(Boolean.FALSE);
}
}).subscribe();
Код после:
WebClient.create()
.get()
.uri(new URI(UPDATE_COMPANIES_URL))
.exchangeToMono(response -> {
if (response.statusCode().equals(HttpStatus.OK)) {
// TODO handle success
} else {
System.out.println("[sendSecureRequest] Error sending request: " + response.statusCode());
}
return Mono.empty();
}).subscribe();