Spring WebFlux с MongoDB - регулирование клиентов SSE

Я работаю над простым чатом, работающим под управлением Spring Boot 2.1.1 с WebFlux, Reactor 3.2.3, Mongo 3.8.2 и Netty 4.1.31.

В каждой комнате чата есть 2 коллекции - архив сообщений и закрытая коллекция с текущими событиями (например, событие нового сообщения, индикаторы ввода пользователя и т. Д.). Ограниченная коллекция имеет 100 элементов, и я использую метод tail() класса ReactiveMongoTemplate для получения последних событий.

Служба предоставляет 2 вида конечных точек для извлечения последних событий: SSE и для опроса. Я провел некоторое стресс-тестирование с 2000 одновременными пользователями, которые, кроме прослушивания чата, рассылали тонны событий.

Наблюдения:

  • Опрос каждые 2 секунды создает небольшую нагрузку для службы (~40% использования ЦП во время теста) и почти не вызывает нагрузки к MongoDB (~4%)
  • прослушивание через SSE максимально увеличивает MongoDB (~90%), также подчеркивает службу (которая пытается использовать оставшиеся доступные ресурсы), но Mongo испытывает особые трудности, и в целом служба становится почти не отвечающей.

Наблюдение кажется очевидным, потому что когда я подключился через SSE во время теста, он почти мгновенно обновлял меня, когда появлялось новое событие - в основном SSE реагировал в сотни раз быстрее, чем опрос каждые 2 секунды.

Вопрос в том:

Учитывая, что клиент в конечном итоге является подписчиком (или, по крайней мере, я думаю, что это дается ограниченным знанием), могу ли я каким-то образом снизить скорость публикации сообщений с помощью ReactiveMongoTemplate? Или как-то уменьшить спрос на новые события без необходимости делать это на стороне клиента?

Я попытал счастья с буферизацией и кэшированием Flux, однако это вызвало еще больший стресс...

Код:

// ChatRepository.java

private static final Query chatEventsQuery = new Query();

public Flux<ChatEvent> getChatEventsStream(String chatId) {
    return reactiveMongoTemplate.tail(
            chatEventsQuery,
            ChatEvent.class,
            chatId
    );
}

,

// ChatHandler.java

public Mono<ServerResponse> getChatStream(ServerRequest request) {

    String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
    String username = getUsername(request);

    Flux<ServerSentEvent> chatEventsStream = chatRepository
            .getChatEventsStream(chatId)
            .map(addUserSpecificPropsToChatEvent(username))
            .map(event -> ServerSentEvent.<ChatEvent>builder()
                    .event(event.getType().getEventName())
                    .data(event)
                    .build());

    log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);

    return ServerResponse.ok().body(
            chatEventsStream,
            ServerSentEvent.class
    );
}

,

// ChatRouter.java

RouterFunction<ServerResponse> routes(ChatHandler handler) {
    return route(GET("/api/chat/{chatId}/stream"), handler::getChatStream);
}

1 ответ

Решение

Ответ: вы делаете это с помощью Flux.buffer метод. Затем поток будет отправлять события подписчикам массовым образом с определенной скоростью.

Код, который я разместил, имел 2 основных проблемы

  1. Учитывая, что несколько пользователей обычно слушают один чат, я реорганизовал ChatRepository, чтобы использовать преимущества "горячих", воспроизводимых потоков (теперь у меня 1 поток на чат вместо 1 потока на пользователя), который я храню в кэше кофеина. Кроме того, я буферизирую их на короткие промежутки времени, чтобы избежать чрезмерного использования ресурсов при передаче событий клиентам в занятых чатах.

  2. new Query() Я использовал в ChatRepository был излишним. Я посмотрел на код ReactiveMongoTemplate и, если предоставляется ненулевой запрос, логика немного сложнее. Лучше пройти nullReactiveMongoTemplate's tail() метод вместо

Пострефакторинг кода

// ChatRepository.java

public Flux<List<ChatEvent>> getChatEventsStream(String chatId) {
    return Optional.ofNullable(chatStreamsCache.getIfPresent(chatId))
            .orElseGet(newCachedChatEventsStream(chatId))
            .autoConnect();
}

private Supplier<ConnectableFlux<List<ChatEvent>>> newCachedChatEventsStream(String chatId) {
    return () -> {
        ConnectableFlux<List<ChatEvent>> chatEventsStream = reactiveMongoTemplate.tail(
                null,
                ChatEvent.class,
                chatId
        ).buffer(Duration.ofMillis(chatEventsBufferInterval))
                .replay(chatEventsReplayCount);

        chatStreamsCache.put(chatId, chatEventsStream);

        return chatEventsStream;
    };
}

,

// ChatHandler.java

public Mono<ServerResponse> getChatStream(ServerRequest request) {

    String chatId = request.pathVariable(CHAT_ID_PATH_VARIABLE);
    String username = getUsername(request);

    Flux<ServerSentEvent> chatEventsStream = chatRepository
            .getChatEventsStream(chatId)
            .map(addUserSpecificPropsToChatEvents(username))
            .map(event -> ServerSentEvent.<List<ChatEvent>>builder()
                    .event(CHAT_SSE_NAME)
                    .data(event)
                    .build());

    log.debug("\nExposing chat stream\nchat: {}\nuser: {}", chatId, username);

    return ServerResponse.ok().body(
            chatEventsStream,
            ServerSentEvent.class
    );
}

,

После применения этих изменений служба работает хорошо даже с 3000 активных пользователей (JVM использует ~50% ЦП, Mongo ~7% в основном из-за большого количества вставок - потоки теперь не так заметны)

Другие вопросы по тегам