Реализация сервера с использованием реактивных потоков Spring

Я работаю над инструментом, который прослушивает потоки на нескольких узлах Redis. Новые узлы могут быть добавлены / удалены в зависимости от нагрузки. У меня есть планировщик, который запускается каждую секунду в поисках активных в данный момент узлов (API дает мне это) В основном настройка следующая (стараюсь изо всех сил визуализировать:-)):

Вот как я слушаю стримы:

       activeNodes = <Poll and get this list every 1 second to see if there are any Redis nodes added/removed>
for(int i=0; i< activeNodes.size(); i++) {
    StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(activeNodes.get(i).getFactory()); 
    receiver
        .receiveAutoAck(Consumer.from("consumer-group", "consumer"),
            StreamOffset.create("my-stream", ReadOffset.lastConsumed()))
        .flatMap(record -> Mono.fromCallable(() -> process(record)) // process request
        .subscribeOn(Schedulers.boundedElastic())) //--> Use the CPU
        .subscribe();
}

В основном это похоже на веб-сервер, который одновременно обрабатывает клиентские запросы. Теперь я столкнулся с такой проблемой:

  1. Как справедливо распределить процессор между всеми запросами независимо от количества создаваемых мной подписчиков Flux? В приведенном выше коде я создаю ограниченный эластичный планировщик для каждого потокового соединения, и он может создать слишком много потоков, поскольку я добавляю больше подписок на новые потоки Redis.
  2. Как мне не отменить подписку, если возникла проблема с подключением? Прямо сейчас, если есть ошибка, он просто отменяет подписку и не обрабатывает дальнейшие сообщения.

0 ответов