Как использовать invokeAll(), чтобы все пулы потоков выполняли свою задачу?
ExecutorService pool=Executors.newFixedThreadPool(7);
List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();
for(int i=0;i<=diff;i++){
String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);
callList.add(new HotelCheapestFare(str));
}
future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){
System.out.println("name is:"+future.get(i).get().getName());
}
Теперь я хочу, чтобы бассейн invokeAll
все задачи, прежде чем попасть в цикл for, но когда я запускаю эту программу для цикла выполняется до того, как invokeAll
и выбрасывает это исключение:
java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:65)
Caused by: java.lang.NullPointerException at
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheapestFare(HotelCheapestFare.java:166)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelCheapestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelCheapestFare.java:1)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run
3 ответа
Способ ExecutorService
работает то, что когда вы звоните invokeAll
он ожидает завершения всех задач:
Выполняет заданные задачи, возвращая список Фьючерсов с их статусом и результатами, когда все выполнено. Future.isDone() имеет значение true для каждого элемента возвращаемого списка. Обратите внимание, что завершенная задача могла быть завершена либо нормально, либо с помощью исключения. Результаты этого метода не определены, если данная коллекция была изменена во время выполнения этой операции. 1(выделение добавлено)
Это означает, что все ваши задачи выполнены, но некоторые могут выдать исключение. Это исключение является частью Future
- звонит get
вызывает исключение, которое будет переброшено, завернутый в ExecutionException
,
От тебя стека
java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at
^^^ <-- from get
Вы можете видеть, что это действительно так. Одна из ваших задач провалилась с NPE. ExecutorService
поймал исключение и рассказывает вам об этом, бросая ExecutionException
когда ты звонишь Future.get
,
Теперь, если вы хотите выполнять задачи по мере их выполнения, вам нужно ExecutorCompletionService
, Это действует как BlockingQueue
это позволит вам опрашивать задачи по мере их завершения.
public static void main(String[] args) throws Exception {
final ExecutorService executorService = Executors.newFixedThreadPool(10);
final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
executorService.submit(new Runnable() {
@Override
public void run() {
for (int i = 0; i < 100; ++i) {
try {
final Future<String> myValue = completionService.take();
//do stuff with the Future
final String result = myValue.get();
System.out.println(result);
} catch (InterruptedException ex) {
return;
} catch (ExecutionException ex) {
System.err.println("TASK FAILED");
}
}
}
});
for (int i = 0; i < 100; ++i) {
completionService.submit(new Callable<String>() {
@Override
public String call() throws Exception {
if (Math.random() > 0.5) {
throw new RuntimeException("FAILED");
}
return "SUCCESS";
}
});
}
executorService.shutdown();
}
В этом примере у меня есть одна задача, которая вызывает take
на ExecutorCompletionService
который получает Future
с момента их появления, а затем я отправляю задачи ExecutorCompletionService
,
Это позволит вам получить невыполненную задачу, как только она выйдет из строя, вместо того, чтобы ждать, пока все задачи потерпят неудачу или не удастся выполнить вместе.
Единственное осложнение состоит в том, что трудно сказать потоку опроса, что все задачи выполнены, поскольку все теперь асинхронно. В этом случае я использовал знание того, что будет отправлено 100 заданий, поэтому нужно всего лишь 100 опросов. Более общим способом было бы собрать Future
с из submit
метод, а затем зациклить их, чтобы увидеть, все ли завершено.
Future.get() выбрасывает ниже исключений.
CancellationException
- если расчет был отменен
ExecutionException
- если в результате вычисления возникло исключение
InterruptedException
- если текущий поток был прерван во время ожидания
Поймай все эти исключения при звонке get()
метод.
Я смоделировал деление на ноль исключения для некоторых Callable
задачи, но исключение в одном Callable
не влияет на других Callable
задачи, представленные ExecutorService
если вы поймете выше трех исключений, как показано в примере кода.
Пример кода:
import java.util.concurrent.*;
import java.util.*;
public class InvokeAllUsage{
public InvokeAllUsage(){
System.out.println("creating service");
ExecutorService service = Executors.newFixedThreadPool(10);
List<MyCallable> futureList = new ArrayList<MyCallable>();
for ( int i=0; i<10; i++){
MyCallable myCallable = new MyCallable((long)i+1);
futureList.add(myCallable);
}
System.out.println("Start");
try{
List<Future<Long>> futures = service.invokeAll(futureList);
for(Future<Long> future : futures){
try{
System.out.println("future.isDone = " + future.isDone());
System.out.println("future: call ="+future.get());
}
catch (CancellationException ce) {
ce.printStackTrace();
} catch (ExecutionException ee) {
ee.printStackTrace();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt(); // ignore/reset
}
}
}catch(Exception err){
err.printStackTrace();
}
System.out.println("Completed");
service.shutdown();
}
public static void main(String args[]){
InvokeAllUsage demo = new InvokeAllUsage();
}
class MyCallable implements Callable<Long>{
Long id = 0L;
public MyCallable(Long val){
this.id = val;
}
public Long call(){
if ( id % 5 == 0){
id = id / 0;
}
return id;
}
}
}
выход:
creating service
Start
future.isDone = true
future: call =1
future.isDone = true
future: call =2
future.isDone = true
future: call =3
future.isDone = true
future: call =4
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
future.isDone = true
future: call =6
future.isDone = true
future: call =7
future.isDone = true
future: call =8
future.isDone = true
future: call =9
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:188)
at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
at java.util.concurrent.FutureTask.run(FutureTask.java:262)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
Completed
invokeAll - это метод блокировки. Это означает, что JVM не перейдет к следующей строке, пока все потоки не будут завершены. Поэтому я думаю, что что-то не так с вашим будущим результатом.
System.out.println("name is:"+future.get(i).get().getName());
из этой строки я думаю, что некоторые фьючерсы не имеют результата и могут быть нулевыми, поэтому вы должны проверить свой код, если есть некоторые фьючерсы нулевые, если это так, получить if перед выполнением этой строки.