Как использовать invokeAll(), чтобы все пулы потоков выполняли свою задачу?

    ExecutorService pool=Executors.newFixedThreadPool(7);
        List<Future<Hotel>> future=new ArrayList<Future<Hotel>>();
        List<Callable<Hotel>> callList = new ArrayList<Callable<Hotel>>();

        for(int i=0;i<=diff;i++){

            String str="2013-"+(liDates.get(i).get(Calendar.MONTH)+1)+"-"+liDates.get(i).get(Calendar.DATE);

            callList.add(new HotelCheapestFare(str));

        }       
     future=pool.invokeAll(callList);
for(int i=0;i<=future.size();i++){

        System.out.println("name is:"+future.get(i).get().getName());
    }

Теперь я хочу, чтобы бассейн invokeAll все задачи, прежде чем попасть в цикл for, но когда я запускаю эту программу для цикла выполняется до того, как invokeAll и выбрасывает это исключение:

java.util.concurrent.ExecutionException: java.lang.NullPointerException at 
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at  
java.util.concurrent.FutureTask.get(Unknown Source) at 
com.mmt.freedom.cheapestfare.TestHotel.main(TestHotel.java:6‌​5)

Caused by: java.lang.NullPointerException at 
com.mmt.freedom.cheapestfare.HotelCheapestFare.getHotelCheap‌estFare(HotelCheapes‌​tFare.java:166) 
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:219)
at com.mmt.freedom.cheapestfare.HotelCheapestFare.call(HotelChe‌​apestFare.java:1) 
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) atjava.util.concurrent.ThreadPoolExecutor$Worker.run(Unknow‌​n Source)
at java.lang.Thread.run

3 ответа

Решение

Способ ExecutorService работает то, что когда вы звоните invokeAll он ожидает завершения всех задач:

Выполняет заданные задачи, возвращая список Фьючерсов с их статусом и результатами, когда все выполнено. Future.isDone() имеет значение true для каждого элемента возвращаемого списка. Обратите внимание, что завершенная задача могла быть завершена либо нормально, либо с помощью исключения. Результаты этого метода не определены, если данная коллекция была изменена во время выполнения этой операции. 1(выделение добавлено)

Это означает, что все ваши задачи выполнены, но некоторые могут выдать исключение. Это исключение является частью Future - звонит get вызывает исключение, которое будет переброшено, завернутый в ExecutionException,

От тебя стека

java.util.concurrent.ExecutionException: java.lang.NullPointerException at
java.util.concurrent.FutureTask$Sync.innerGet(Unknown Source) at
java.util.concurrent.FutureTask.get(Unknown Source) at 
                                ^^^ <-- from get

Вы можете видеть, что это действительно так. Одна из ваших задач провалилась с NPE. ExecutorService поймал исключение и рассказывает вам об этом, бросая ExecutionException когда ты звонишь Future.get,

Теперь, если вы хотите выполнять задачи по мере их выполнения, вам нужно ExecutorCompletionService, Это действует как BlockingQueue это позволит вам опрашивать задачи по мере их завершения.

public static void main(String[] args) throws Exception {
    final ExecutorService executorService = Executors.newFixedThreadPool(10);
    final ExecutorCompletionService<String> completionService = new ExecutorCompletionService<>(executorService);
    executorService.submit(new Runnable() {
        @Override
        public void run() {
            for (int i = 0; i < 100; ++i) {
                try {
                    final Future<String> myValue = completionService.take();
                    //do stuff with the Future
                    final String result = myValue.get();
                    System.out.println(result);
                } catch (InterruptedException ex) {
                    return;
                } catch (ExecutionException ex) {
                    System.err.println("TASK FAILED");
                }
            }
        }
    });
    for (int i = 0; i < 100; ++i) {
        completionService.submit(new Callable<String>() {
            @Override
            public String call() throws Exception {
                if (Math.random() > 0.5) {
                    throw new RuntimeException("FAILED");
                }
                return "SUCCESS";
            }
        });
    }
    executorService.shutdown();
}

В этом примере у меня есть одна задача, которая вызывает take на ExecutorCompletionService который получает Futureс момента их появления, а затем я отправляю задачи ExecutorCompletionService,

Это позволит вам получить невыполненную задачу, как только она выйдет из строя, вместо того, чтобы ждать, пока все задачи потерпят неудачу или не удастся выполнить вместе.

Единственное осложнение состоит в том, что трудно сказать потоку опроса, что все задачи выполнены, поскольку все теперь асинхронно. В этом случае я использовал знание того, что будет отправлено 100 заданий, поэтому нужно всего лишь 100 опросов. Более общим способом было бы собрать Futureс из submit метод, а затем зациклить их, чтобы увидеть, все ли завершено.

Future.get() выбрасывает ниже исключений.

CancellationException - если расчет был отменен

ExecutionException - если в результате вычисления возникло исключение

InterruptedException - если текущий поток был прерван во время ожидания

Поймай все эти исключения при звонке get() метод.

Я смоделировал деление на ноль исключения для некоторых Callable задачи, но исключение в одном Callable не влияет на других Callable задачи, представленные ExecutorService если вы поймете выше трех исключений, как показано в примере кода.

Пример кода:

import java.util.concurrent.*;
import java.util.*;

public class InvokeAllUsage{
    public InvokeAllUsage(){
        System.out.println("creating service");
        ExecutorService service = Executors.newFixedThreadPool(10);

        List<MyCallable> futureList = new ArrayList<MyCallable>();
        for ( int i=0; i<10; i++){
            MyCallable myCallable = new MyCallable((long)i+1);
            futureList.add(myCallable);
        }
        System.out.println("Start");
        try{
            List<Future<Long>> futures = service.invokeAll(futureList);  
            for(Future<Long> future : futures){
                try{
                    System.out.println("future.isDone = " + future.isDone());
                    System.out.println("future: call ="+future.get());
                }
                catch (CancellationException ce) {
                    ce.printStackTrace();
                } catch (ExecutionException ee) {
                    ee.printStackTrace();
                } catch (InterruptedException ie) {
                    Thread.currentThread().interrupt(); // ignore/reset
                }
            }
        }catch(Exception err){
            err.printStackTrace();
        }
        System.out.println("Completed");
        service.shutdown();
    }
    public static void main(String args[]){
        InvokeAllUsage demo = new InvokeAllUsage();
    }
    class MyCallable implements Callable<Long>{
        Long id = 0L;
        public MyCallable(Long val){
            this.id = val;
        }
        public Long call(){

            if ( id % 5 == 0){
                id = id / 0;
            }           
            return id;
        }
    }
}

выход:

creating service
Start
future.isDone = true
future: call =1
future.isDone = true
future: call =2
future.isDone = true
future: call =3
future.isDone = true
future: call =4
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
        at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
future.isDone = true
future: call =6
future.isDone = true
future: call =7
future.isDone = true
future: call =8
future.isDone = true
future: call =9
future.isDone = true
java.util.concurrent.ExecutionException: java.lang.ArithmeticException: / by zero
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:188)
        at InvokeAllUsage.<init>(InvokeAllUsage.java:20)
        at InvokeAllUsage.main(InvokeAllUsage.java:37)
Caused by: java.lang.ArithmeticException: / by zero
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:47)
        at InvokeAllUsage$MyCallable.call(InvokeAllUsage.java:39)
        at java.util.concurrent.FutureTask.run(FutureTask.java:262)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Completed

invokeAll - это метод блокировки. Это означает, что JVM не перейдет к следующей строке, пока все потоки не будут завершены. Поэтому я думаю, что что-то не так с вашим будущим результатом.

System.out.println("name is:"+future.get(i).get().getName());

из этой строки я думаю, что некоторые фьючерсы не имеют результата и могут быть нулевыми, поэтому вы должны проверить свой код, если есть некоторые фьючерсы нулевые, если это так, получить if перед выполнением этой строки.

Другие вопросы по тегам