Многопоточное решение проблемы - RxJava vs ExecutorService

Я пытаюсь создать приложение, требующее некоторого параллелизма, поскольку важна пропускная способность.

Шаги можно резюмировать следующим образом:

  1. У меня несколько классов AccountCollector. Каждый из них извлекает UserAccounts из двух разных конечных точек REST и объединяет ответы в список. Так

AccountCollector1 -> (AccountRestService1, AccountRestService2) -> список возврата

AccountCollector2 -> (AccountRestService3, AccountRestService4) -> список возврата

  1. В идеале вызовы внутри AccountCollector должны выполняться одновременно. Он должен отправить запросы и подождать, пока оба не вернутся, затем выполнить некоторую обработку результатов и уведомить кого-то, ожидающего результата.

  2. Также в идеале AccountCollectors также должны работать параллельно, они не зависят друг от друга.

Таким образом, существует два уровня параллелизма: AccountCollectors, работающие параллельно, и AccountRestServices, работающие параллельно в каждом AccountCollector.

Я ищу лучшую реализацию для этого.

Я начал с использования Spring Webflux, чтобы AccountRestService возвращал Mono.

Я думал, что RxJava будет идеальным для этого, но мне не удалось найти способ объединить результаты таким образом, чтобы слияние ждало, пока все клиенты REST не вернут Mono или, по крайней мере, тайм-аут / сбой

Итак, я пошел дальше и реализовал параллелизм с помощью ExecutorService (псевдокод ниже). Я также использую ExecutorService для достижения параллелизма между AccountCollectors.

У меня следующие вопросы:

  1. Для меня тот факт, что я смешиваю ExecutorService и реактивное программирование, говорит о том, что что-то не так. Это было бы правильно?
  2. Учитывая, что в будущем количество AccountCollectors может вырасти до сотен, является ли ExecutorService лучшим решением, чем RxJava?
  3. Если нет, то как лучше всего объединить вызовы клиентов REST с помощью RxJava? Какие-либо предложения?

Извините за многословный вопрос, я рад предоставить более подробную информацию. Больше всего меня беспокоит то, что я начал с WebFlux и теперь чувствую, что теряю все преимущества, которые это дает мне.

Спасибо!

      public interface AccountRestService {
    Mono<UserAccount> fetchUserAccount();
}

      public class AccountCollector {

public List<UserAccount> collect() {
        ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
        CompletionService<List<UserAccount> pool = new ExecutorCompletionService<>(executor);

        ///submit to pool two rest clients
        // get from pool, collect
}

}

0 ответов

Другие вопросы по тегам