Решение CompletableFuture для решения Reactor (или Akka)
У меня есть следующий метод, используя CompletableFuture
как это:
public AClass aMethod() {
CompletableFuture<SomeClassA> someClassAFuture =
CompletableFuture.supplyAsync(() -> someMethodThatReturnsA());
CompletableFuture<SomeClassB> someClassBFuture =
CompletableFuture.supplyAsync(() -> someMethodThatReturnsB());
CompletableFuture<SomeClassC> someClassCFuture =
CompletableFuture.supplyAsync(() -> someMethodThatReturnsC());
CompletableFuture.allOf(someClassAFuture, someClassBFuture, someClassCFuture).join();
return new AClass(someClassAFuture.join(), someClassBFuture.join(), someClassCFuture.join());
}
Этот код имеет проблему тупика, когда T
потоки одновременно входят в метод, если в пуле соединений вил меньше потоков, чем T * 3
(потому что ни один из allOf
вызовы могут завершаться, и они не вернут в пул текущие занятые потоки).
Единственный способ решить эту проблему - ограничить количество одновременных потоков внутри метода (используя Spring @Async
аннотации с исполнителем потока) или увеличение потоков в пуле соединений форка.
Я хотел бы лучшего решения, где я могу полностью забыть о размере пула потоков. Как я могу переписать это с помощью Reactor или Akka?
1 ответ
Реализация во фьючерсах Akka будет выглядеть примерно так (полностью не проверено):
Future< SomeClassA > f1 = future(() -> someMethodThatReturnsA(), system.dispatcher());
Future< SomeClassB > f2 = future(() -> someMethodThatReturnsB(), system.dispatcher());
Future< SomeClassC > f3 = future(() -> someMethodThatReturnsC(), system.dispatcher());
List<Future<Object>> futures = Arrays.asList(f1, f2, f3);
return sequence(futures).map((results) -> new AClass(results.get(0),results.get(1),results.get(2)));
Может потребоваться дополнительная работа для разбора результата фьючерса перед созданием AClass
, Обратите внимание, что вы возвращаете Future< AClass >
сейчас в aMethod
Хотя проблема с вашим кодом в том, что он блокируется. Вы пытались присоединиться ко всем CompletableFutures, используя thenApply
а также thenCompose
вернуть CompletableFuture<AClass>
?