Блок Java CompletableFuture.complete()
У меня проблема при использовании CompletableFuture в Java. У меня есть 2 запроса на выбор, которые заполняются при получении ответов от сервера.
В соединительной резьбе (THREAD-1) (использую реактор) я использую:
if(hasException) {
selectFuture.completeExceptionally(new ClientException(errorCode));
} else {
System.out.println("Before complete future");
selectFuture.complete(result);
System.out.println("After complete future");
}
А в другой теме (THREAD-2) я использую:
CompleteFuture.allOf(allSelect).whenComplete((aVoid, throwable) -> {
System.out.println("Receive all future");
// Do sth here
});
Моя ситуация такова, что система распечатывает "Получить все будущее", но THREAD-1 блокируется при вызове future.complete(result);
Он не может выйти из этой команды. Если в THREAD-2, я использую CompletableFuture.allOf(allOfSelect).get()
, THREAD-1 будет работать правильно. Но используя CompletableFuture.get()
снижает производительность, поэтому я хотел бы использовать CompletableFuture.whenComplete()
,
Кто-нибудь может помочь мне объяснить причину блокировки?
Спасибо!
1 ответ
complete
вызов вызывает все зависимые CompletionStage
s.
Так что, если вы ранее зарегистрировали BiConsumer
с whenComplete
, complete
вызовет его в вызывающем потоке. В вашем случае звонок на complete
вернется, когда BiConsumer
ты перешел на whenComplete
отделки. Это описано в классе Javadoc
Действия, предоставляемые для зависимых завершений неасинхронных методов, могут выполняться потоком, который завершает текущий
CompletableFuture
или любым другим вызывающим абонентом метода завершения.
(другим вызывающим является противоположная ситуация, когда поток, вызывающий whenComplete
будет на самом деле применить BiConsumer
если цель CompletableFuture
уже было завершено.)
Вот небольшая программа для иллюстрации поведения:
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
future.whenComplete((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
}
Это напечатает
Thread[main,5,main]
before sleep, executed in thread Thread[main,5,main]
after sleep, executed in thread Thread[main,5,main]
done
показывая, что BiConsumer
был применен в основном потоке, который называется complete
,
Ты можешь использовать whenCompleteAsync
форсировать исполнение BiConsumer
в отдельной теме.
[...] который выполняет данное действие, используя средство асинхронного выполнения по умолчанию этого этапа, когда этот этап завершается.
Например,
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = new CompletableFuture<String>();
CompletableFuture<?> done = future.whenCompleteAsync((r, t) -> {
System.out.println("before sleep, executed in thread " + Thread.currentThread());
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("after sleep, executed in thread " + Thread.currentThread());
});
System.out.println(Thread.currentThread());
future.complete("completed");
System.out.println("done");
done.get();
}
распечатает
Thread[main,5,main]
done
before sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
after sleep, executed in thread Thread[ForkJoinPool.commonPool-worker-1,5,main]
показывая, что BiConsumer
был применен в отдельной теме.