Реализация сервера с использованием реактивных потоков Spring
Я работаю над инструментом, который прослушивает потоки на нескольких узлах Redis. Новые узлы могут быть добавлены / удалены в зависимости от нагрузки. У меня есть планировщик, который запускается каждую секунду в поисках активных в данный момент узлов (API дает мне это) В основном настройка следующая (стараюсь изо всех сил визуализировать:-)):
Вот как я слушаю стримы:
activeNodes = <Poll and get this list every 1 second to see if there are any Redis nodes added/removed>
for(int i=0; i< activeNodes.size(); i++) {
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(activeNodes.get(i).getFactory());
receiver
.receiveAutoAck(Consumer.from("consumer-group", "consumer"),
StreamOffset.create("my-stream", ReadOffset.lastConsumed()))
.flatMap(record -> Mono.fromCallable(() -> process(record)) // process request
.subscribeOn(Schedulers.boundedElastic())) //--> Use the CPU
.subscribe();
}
В основном это похоже на веб-сервер, который одновременно обрабатывает клиентские запросы. Теперь я столкнулся с такой проблемой:
- Как справедливо распределить процессор между всеми запросами независимо от количества создаваемых мной подписчиков Flux? В приведенном выше коде я создаю ограниченный эластичный планировщик для каждого потокового соединения, и он может создать слишком много потоков, поскольку я добавляю больше подписок на новые потоки Redis.
- Как мне не отменить подписку, если возникла проблема с подключением? Прямо сейчас, если есть ошибка, он просто отменяет подписку и не обрабатывает дальнейшие сообщения.