Spring WebFlux rest controller обслуживает только первые две подписки
Там поток создан программно Flux.create
метод:
Flux<Tweet> flux = Flux.<Tweet>create(emitter -> ...);
Есть контроллер отдыха:
@RestController
public class StreamController {
...
@GetMapping("/top-words")
public Flux<TopWords> streamTopWords() {
return topWordsStream.getTopWords();
}
}
Есть пара веб-клиентов (в автономных процессах):
Flux<TopWords> topWordsFlux = WebClient.create(".../top-words")
.method(HttpMethod.GET)
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(TopWords.class)
.subscribe(System.out::println);
В JavaScript есть несколько экземпляров EventSource:
var eventSource = new EventSource(".../top-words");
eventSource.onmessage = function (e) {
console.log("Processing message: ", e.data);
};
Только первые два "подписчика" начнут получать сообщения (независимо от того, является ли это веб-клиентом или экземпляром EventSource). Другой откроет соединение, получит статус HTTP 200, но поток событий останется пустым. Там нет ошибок ни на стороне клиента, ни на стороне сервера.
Я не понимаю, где установлен лимит на "2 подписчика". Что мне делать, если я хочу поддерживать более 2 подписчиков?
Приложение построено с использованием Spring Boot 2.0.0.RELEASE и автоматически настраивается с помощью Spring-boot-starter-webflux. Конфигурация по умолчанию не изменена.
1 ответ
В базовом API есть ограничение, которое я пытался адаптировать (API потоковой передачи Twitter).
Цель состояла в том, чтобы один раз подключиться к Twitter и обработать поток твитов различными подписчиками.
Первоначально я думал, что излучатель перешел к Flux.create
метод всегда использует один и тот же FluxSink
для всех подписчиков. Это, конечно, не имеет смысла. Вместо FluxSink
предоставляется для каждого подписчика, как ясно указывает Javadoc.
Я реализовал свой сценарий использования с помощью прослушивателя Twitter, который позволяет регистрировать (и отменять регистрацию) ряд FluxSink
экземпляров. Таким образом, отдельный поток твитов может быть подписан различными подписчиками.
Flux<Tweet> flux = Flux.<Tweet>create(twitterListener::addSink);
мой twitterListener
инвентарь org.springframework.social.twitter.api.StreamListener
из весенне-социального-твиттер-проекта.