Блок Java CompletableFuture.complete()

У меня проблема при использовании CompletableFuture в Java. У меня есть 2 запроса на выбор, которые заполняются при получении ответов от сервера.

В соединительной резьбе (THREAD-1) (использую реактор) я использую:

if(hasException) {
   selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
   System.out.println("Before complete future");
   selectFuture.complete(result);
   System.out.println("After complete future");
}

А в другой теме (THREAD-2) я использую:

CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
   System.out.println("Receive all future");
   // Do sth here
});

Моя ситуация такова, что система распечатывает "Получить все будущее", но THREAD-1 блокируется при вызове future.complete(result); Он не может выйти из этой команды. Если в THREAD-2, я использую CompletableFuture.allOf(allOfSelect).get(), THREAD-1 будет работать правильно. Но используя CompletableFuture.get() снижает производительность, поэтому я хотел бы использовать CompletableFuture.whenComplete(),

Кто-нибудь может помочь мне объяснить причину блокировки?

Спасибо!

1 ответ

Решение

complete вызов вызывает все зависимые CompletionStage s.

Так что, если вы ранее зарегистрировали BiConsumer с whenComplete, complete вызовет его в вызывающем потоке. В вашем случае звонок на complete вернется, когда BiConsumer ты перешел на whenComplete отделки. Это описано в классе Javadoc

Действия, предоставляемые для зависимых завершений неасинхронных методов, могут выполняться потоком, который завершает текущий CompletableFuture или любым другим вызывающим абонентом метода завершения.

(другим вызывающим является противоположная ситуация, когда поток, вызывающий whenComplete будет на самом деле применить BiConsumer если цель CompletableFuture уже было завершено.)

Вот небольшая программа для иллюстрации поведения:

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = new CompletableFuture<String>();
    future.whenComplete((r, t) -> {
        System.out.println("before sleep, executed in thread " + Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after sleep, executed in thread " + Thread.currentThread());
    });

    System.out.println(Thread.currentThread());
    future.complete("completed");
    System.out.println("done");
}

Это напечатает

Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done

показывая, что BiConsumer был применен в основном потоке, который называется complete,

Ты можешь использовать whenCompleteAsync форсировать исполнение BiConsumer в отдельной теме.

[...] который выполняет данное действие, используя средство асинхронного выполнения по умолчанию этого этапа, когда этот этап завершается.

Например,

public static void main(String[] args) throws Exception {
    CompletableFuture<String> future = new CompletableFuture<String>();
    CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
        System.out.println("before sleep, executed in thread " + Thread.currentThread());
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("after sleep, executed in thread " + Thread.currentThread());
    });

    System.out.println(Thread.currentThread());
    future.complete("completed");
    System.out.println("done");
    done.get();
}

распечатает

Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]

показывая, что BiConsumer был применен в отдельной теме.

Другие вопросы по тегам