Spring WebFlux с MongoDB - регулирование клиентов SSE
Я работаю над простым чатом, работающим под управлением Spring Boot 2.1.1 с WebFlux, Reactor 3.2.3, Mongo 3.8.2 и Netty 4.1.31.
В каждой комнате чата есть 2 коллекции - архив сообщений и закрытая коллекция с текущими событиями (например, событие нового сообщения, индикаторы ввода пользователя и т. Д.). Ограниченная коллекция имеет 100 элементов, и я использую метод tail() класса ReactiveMongoTemplate для получения последних событий.
Служба предоставляет 2 вида конечных точек для извлечения последних событий: SSE и для опроса. Я провел некоторое стресс-тестирование с 2000 одновременными пользователями, которые, кроме прослушивания чата, рассылали тонны событий.
Наблюдения:
- Опрос каждые 2 секунды создает небольшую нагрузку для службы (~40% использования ЦП во время теста) и почти не вызывает нагрузки к MongoDB (~4%)
- прослушивание через SSE максимально увеличивает MongoDB (~90%), также подчеркивает службу (которая пытается использовать оставшиеся доступные ресурсы), но Mongo испытывает особые трудности, и в целом служба становится почти не отвечающей.
Наблюдение кажется очевидным, потому что когда я подключился через SSE во время теста, он почти мгновенно обновлял меня, когда появлялось новое событие - в основном SSE реагировал в сотни раз быстрее, чем опрос каждые 2 секунды.
Вопрос в том:
Учитывая, что клиент в конечном итоге является подписчиком (или, по крайней мере, я думаю, что это дается ограниченным знанием), могу ли я каким-то образом снизить скорость публикации сообщений с помощью ReactiveMongoTemplate? Или как-то уменьшить спрос на новые события без необходимости делать это на стороне клиента?
Я попытал счастья с буферизацией и кэшированием Flux, однако это вызвало еще больший стресс...
Код:
// ChatRepository.java
private static final Query chatEventsQuery = new Query();
public Flux<ChatEvent> getChatEventsStream(String chatId) {
return reactiveMongoTemplate.tail(
chatEventsQuery,
ChatEvent.class,
chatId
);
}
,
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvent(username))
.map(event -> ServerSentEvent.<ChatEvent>builder()
.event(event.getType().getEventName())
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
// ChatRouter.java
RouterFunction<ServerResponse> routes(ChatHandler handler) {
return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}
1 ответ
Ответ: вы делаете это с помощью Flux.buffer
метод. Затем поток будет отправлять события подписчикам массовым образом с определенной скоростью.
Код, который я разместил, имел 2 основных проблемы
Учитывая, что несколько пользователей обычно слушают один чат, я реорганизовал ChatRepository, чтобы использовать преимущества "горячих", воспроизводимых потоков (теперь у меня 1 поток на чат вместо 1 потока на пользователя), который я храню в кэше кофеина. Кроме того, я буферизирую их на короткие промежутки времени, чтобы избежать чрезмерного использования ресурсов при передаче событий клиентам в занятых чатах.
new Query()
Я использовал в ChatRepository был излишним. Я посмотрел на код ReactiveMongoTemplate и, если предоставляется ненулевой запрос, логика немного сложнее. Лучше пройтиnull
ReactiveMongoTemplate'stail()
метод вместо
Пострефакторинг кода
// ChatRepository.java
public Flux<List<ChatEvent>> getChatEventsStream(String chatId) {
return Optional.ofNullable(chatStreamsCache.getIfPresent(chatId))
.orElseGet(newCachedChatEventsStream(chatId))
.autoConnect();
}
private Supplier<ConnectableFlux<List<ChatEvent>>> newCachedChatEventsStream(String chatId) {
return () -> {
ConnectableFlux<List<ChatEvent>> chatEventsStream = reactiveMongoTemplate.tail(
null,
ChatEvent.class,
chatId
).buffer(Duration.ofMillis(chatEventsBufferInterval))
.replay(chatEventsReplayCount);
chatStreamsCache.put(chatId, chatEventsStream);
return chatEventsStream;
};
}
,
// ChatHandler.java
public Mono<ServerResponse> getChatStream(ServerRequest request) {
String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
String username = getUsername(request);
Flux<ServerSentEvent> chatEventsStream = chatRepository
.getChatEventsStream(chatId)
.map(addUserSpecificPropsToChatEvents(username))
.map(event -> ServerSentEvent.<List<ChatEvent>>builder()
.event(CHAT_SSE_NAME)
.data(event)
.build());
log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);
return ServerResponse.ok().body(
chatEventsStream,
ServerSentEvent.class
);
}
,
После применения этих изменений служба работает хорошо даже с 3000 активных пользователей (JVM использует ~50% ЦП, Mongo ~7% в основном из-за большого количества вставок - потоки теперь не так заметны)