Spring WebFlux rest controller обслуживает только первые две подписки

Там поток создан программно Flux.create метод:

Flux<Tweet> flux = Flux.<Tweet>create(emitter -> ...);

Есть контроллер отдыха:

@RestController
public class StreamController {
    ...

    @GetMapping("/top-words")
    public Flux<TopWords> streamTopWords() {
        return topWordsStream.getTopWords();
    }
}

Есть пара веб-клиентов (в автономных процессах):

Flux<TopWords> topWordsFlux = WebClient.create(".../top-words")
        .method(HttpMethod.GET)
        .accept(MediaType.TEXT_EVENT_STREAM)
        .retrieve()
        .bodyToFlux(TopWords.class)
        .subscribe(System.out::println);

В JavaScript есть несколько экземпляров EventSource:

var eventSource = new EventSource(".../top-words");

eventSource.onmessage = function (e) {
    console.log("Processing message: ", e.data);
};

Только первые два "подписчика" начнут получать сообщения (независимо от того, является ли это веб-клиентом или экземпляром EventSource). Другой откроет соединение, получит статус HTTP 200, но поток событий останется пустым. Там нет ошибок ни на стороне клиента, ни на стороне сервера.

Я не понимаю, где установлен лимит на "2 подписчика". Что мне делать, если я хочу поддерживать более 2 подписчиков?

Приложение построено с использованием Spring Boot 2.0.0.RELEASE и автоматически настраивается с помощью Spring-boot-starter-webflux. Конфигурация по умолчанию не изменена.

1 ответ

Решение

В базовом API есть ограничение, которое я пытался адаптировать (API потоковой передачи Twitter).

Цель состояла в том, чтобы один раз подключиться к Twitter и обработать поток твитов различными подписчиками.

Первоначально я думал, что излучатель перешел к Flux.create метод всегда использует один и тот же FluxSink для всех подписчиков. Это, конечно, не имеет смысла. Вместо FluxSink предоставляется для каждого подписчика, как ясно указывает Javadoc.

Я реализовал свой сценарий использования с помощью прослушивателя Twitter, который позволяет регистрировать (и отменять регистрацию) ряд FluxSink экземпляров. Таким образом, отдельный поток твитов может быть подписан различными подписчиками.

Flux<Tweet> flux = Flux.<Tweet>create(twitterListener::addSink);

мой twitterListener инвентарь org.springframework.social.twitter.api.StreamListener из весенне-социального-твиттер-проекта.

Другие вопросы по тегам