Как ограничить количество открытых сокетов в Spring-webflux WebClient?
У меня есть RESTful-сервис, и у меня была идея подготовить простой тест производительности с Reactor и Spring WebClient. Бенчмарк просто создает N пользователей, а затем за каждый созданный пользовательский пост M голосует.
К сожалению, следующий код превышает максимально допустимое количество открытых файлов на моем компьютере с Linux, равное 1024 (ulimit -n 1024
).
RestService restService = ...
int N_ITERATIONS = 100;
int M_VOTES = 100;
Flux.range(0, N_ITERATIONS)
.parallel()
.runOn(Schedulers.parallel())
.flatMap(iteration -> restService.postUserRegistration(User.builder().build()))
.flatMap(user -> Flux.range(0, M_VOTES)
.flatMap(vote -> restService.postUserVote(Vote.builder().build()))
.collectList()
.map(votes -> Tuple.of(user, votes))
).doOnNext(userVotes -> log.info("User: {} voted: {}", userVotes._1(), userVotes._2()))
.sequential()
.toIterable();
RestService реализован со стандартным WebClient от Spring Webflux.
Есть ли способ ограничить количество созданных сокетов на основе системного лимита?
Трассировки стека:
Caused by: io.netty.channel.unix.Errors$NativeIoException: newSocketStream(..) failed: Too many open files
at io.netty.channel.unix.Errors.newIOException(Errors.java:122) ~[netty-transport-native-unix-common-4.1.27.Final.jar:4.1.27.Final]
... 98 common frames omitted
1 ответ
Я не думаю, что есть. Но вы могли бы принять меры, чтобы предотвратить это.
Во-первых, почему ваш файловый дескриптор ограничен настолько низко? Linux открывает файловый дескриптор для каждого открытого сокета, поэтому 1024 очень мало, если вы хотите иметь много открытых сокетов одновременно. Я хотел бы рассмотреть вопрос об увеличении этого лимита.
Во-вторых, вы оставляете конфигурацию параллелизма вплоть до планировщика. Вы должны знать, что есть вариант flatMap
оператор, который позволяет контролировать, сколько Publisher
могут быть подписаны и объединены параллельно:
Flux<V> flatMap(
Function<? super T,? extends Publisher<? extends V>> mapper,
int concurrency)
С использованием concurrency
Параметр вы сможете определить, сколько последовательностей в полете вы хотите разрешить.