Как остановить основной поток для завершения всех вызовов Mono?

Я делаю несколько моно-вызовов к БД. И результат всех моно-ответов необходим для вычисления окончательного результата, который записывается после объявленной логики моно.

if (SomeObject.getAccountLevelActiveList() != null) {

                SomeObject.getAccountLevelActiveList().parallelStream().forEach(account -> {
                    Mono<SubLine> subLineMono= SubLineService
                            .getLineLevelCustProfile(preNbsLineLevelConverter.getSubLine(account ));
                

                    subLineMono.subscribe(subLine-> PollObject.getSubList()
                            .put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine)));

                });

            }

Но моя основная логика выполняется до того, как моно-результат будет сохранен в PollObject. поэтому я получаю null в PollObject. Поэтому я хочу остановить свой основной поток, пока результаты Mono не будут сохранены в PollObject.

1 ответ

Решение

Если вы хотите остановить основной поток, вы можете использовать блокировку вместо подписки, но сначала вы должны преобразовать List в Flux а потом flatMap-это использование предоставленных Mono. Логика, которая у вас есть в subscribe метод можно переместить в оператор побочного эффекта doOnNext любой из Mono или упаковка Flux:

Flux.fromIterable(SomeObject.getAccountLevelActiveList())
    .flatMap(account ->
        SubLineService.getLineLevelCustProfile(
            preNbsLineLevelConverter.getSubLine( account ))
    ).doOnNext(subLine ->
        PollObject.getSubList().put(accountLevelMtn.getMtn(),
            Optional.ofNullable(subLine))
    ).blockLast();
    // the following code will be executed first when all monos are completed

Если ваш код соответствует ifне требуется для запуска в основном потоке, было бы лучше оставаться реактивным, как уже предлагалось @ chrylis-cautiouslyoptimistic. Использовать reduce оператор, чтобы собрать все результаты вместе, создав моно, которое будет завершено, когда будут выполнены все предоставленные моно:

Flux.fromIterable(SomeObject.getAccountLevelActiveList())
    .flatMap(account ->
        SubLineService.getLineLevelCustProfile(
            preNbsLineLevelConverter.getSubLine( account ))
    ).reduce(PollObject.getSubList(), (subList, subLine) ->
        sublist.put(accountLevelMtn.getMtn(), Optional.ofNullable(subLine))
    ).map(subList -> {
        // the code here will be executed first when all monos are completed
    })
    // ... other operators if necessary
    // eventually subscribing or returning the monofor further processing 
    .subscribe();
Другие вопросы по тегам