Как остановить основной поток для завершения всех вызовов Mono?
Я делаю несколько моно-вызовов к БД. И результат всех моно-ответов необходим для вычисления окончательного результата, который записывается после объявленной логики моно.
if (SomeObject.getAccountLevelActiveList() != null) {
SomeObject.getAccountLevelActiveList().parallelStream().forEach(account -> {
Mono<SubLine> subLineMono= SubLineService
.getLineLevelCustProfile(preNbsLineLevelConverter.getSubLine(account ));
subLineMono.subscribe(subLine-> PollObject.getSubList()
.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine)));
});
}
Но моя основная логика выполняется до того, как моно-результат будет сохранен в PollObject. поэтому я получаю null в PollObject. Поэтому я хочу остановить свой основной поток, пока результаты Mono не будут сохранены в PollObject.
1 ответ
Если вы хотите остановить основной поток, вы можете использовать блокировку вместо подписки, но сначала вы должны преобразовать
List
в
Flux
а потом
flatMap
-это использование предоставленных
Mono
. Логика, которая у вас есть в
subscribe
метод можно переместить в оператор побочного эффекта
doOnNext
любой из
Mono
или упаковка
Flux
:
Flux.fromIterable(SomeObject.getAccountLevelActiveList())
.flatMap(account ->
SubLineService.getLineLevelCustProfile(
preNbsLineLevelConverter.getSubLine( account ))
).doOnNext(subLine ->
PollObject.getSubList().put(accountLevelMtn.getMtn(),
Optional.ofNullable(subLine))
).blockLast();
// the following code will be executed first when all monos are completed
Если ваш код соответствует
if
не требуется для запуска в основном потоке, было бы лучше оставаться реактивным, как уже предлагалось @ chrylis-cautiouslyoptimistic. Использовать
reduce
оператор, чтобы собрать все результаты вместе, создав моно, которое будет завершено, когда будут выполнены все предоставленные моно:
Flux.fromIterable(SomeObject.getAccountLevelActiveList())
.flatMap(account ->
SubLineService.getLineLevelCustProfile(
preNbsLineLevelConverter.getSubLine( account ))
).reduce(PollObject.getSubList(), (subList, subLine) ->
sublist.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine))
).map(subList -> {
// the code here will be executed first when all monos are completed
})
// ... other operators if necessary
// eventually subscribing or returning the monofor further processing
.subscribe();