Проблема множественного параллелизма с будущим объектом

Я пытаюсь порождать несколько потоков и помещать их в список по мере их выполнения. Когда они завершат свою обработку, я бы хотел собрать их результаты для презентации. Таким образом, у меня может быть список, содержащий много потоков, а затем, как только они станут доступны, я могу позвонить future.get и использовать их информацию обратного вызова.

Почему-то мне не хватает многих результатов. Когда я перехожу через код, f.get() проходит, когда это не должно быть, и я не могу понять, почему.

Мой код выглядит следующим образом:

     public class ThreadTesterRunner {
            static List<Integer> randoms = new ArrayList<>();

            public static void main(String[] args) throws InterruptedException {
                final Phaser cb = new Phaser();
                ThreadRunner tr = new ThreadRunner(cb);
                Thread t = new Thread(tr, "Thread Runner");
                t.start();

                boolean process = true;
                // wait until all threads process, then print reports
                while (process){
                    if(tr.isFinished()){
                        System.out.println("Print metrics");
                        process = false;

                    }
                    Thread.sleep(1000);
                }
            }
        }


      class ThreadRunner implements Runnable {
            private ExecutorService executorService = Executors.newFixedThreadPool(10);
            private final Phaser barrier;
            private boolean finished=false;

            public ThreadRunner(Phaser phaser) {this.barrier = phaser;}

            public void run(){
                try {
                    List<Future<Integer>> list = new ArrayList<>();
                    boolean stillLoop = true; int i = 0;

                    final Phaser p = this.barrier;
                    Callable<Integer> task = new Callable<Integer>() {
                           public Integer call() throws Exception {
                               return new Reader().doRun(p);
                           }
                    };

                    List<Integer> randoms = new ArrayList<>();
                    Integer size;
                    while (stillLoop){
                        System.out.println("i "+i);
                        list.add(executorService.submit(task));

                        for(Future<Integer> f: list){
                            if(f.isDone()){
                                size = f.get();
                                System.out.println("size "+size);
                                list.remove(f);
                            } else {
                                  // keep processing
                            }
                        }
                        if(i == 2){
                            System.out.println("breaking out of loop");
                            stillLoop = false;
                        }
                        i++;
                    }
                    this.barrier.awaitAdvance(0);
                    this.finished=true;
                } catch (Exception e1) {
                    e1.printStackTrace();
                }
            }
            public boolean isFinished(){
                return this.finished;
            }
        }

class Reader {

    private Phaser readBarrier;
    private ExecutorService executorService = Executors.newFixedThreadPool(20);

    public Reader() {
    }    

    Random randomGenerator = new Random();

    public Integer doRun(Phaser phaser) throws Exception {
        phaser.register();
        this.readBarrier = phaser;
        System.out.println("Reading...");
        int i;

        int r = randomGenerator.nextInt(100);
        System.out.println("r "+r);
        ThreadTesterRunner.randoms.add(r);

        int a = this.readBarrier.arrive();
        return r; //i;
    }
}

Есть идеи, почему это может происходить?

РЕДАКТИРОВАТЬ:

Хорошо, я думаю, что у меня все работает

class ThreadRunner implements Runnable {
    // static int timeOutTime = 2;
    private ExecutorService executorService = Executors.newFixedThreadPool(10);
    private final Phaser barrier;
    private boolean finished = false;

    public ThreadRunner(Phaser phaser) {
        this.barrier = phaser;
    }

    public void run() {
        try {
            List<Future<Integer>> list = new CopyOnWriteArrayList<>(); 
            boolean stillLoop = true;
            int i = 0;

            final Phaser p = this.barrier;
            Callable<Integer> readerTask = new Callable<Integer>() {
                public Integer call() throws Exception {
                    return new Reader().doRun(p);
                }
            };

            List<Integer> randoms = new ArrayList<>();
            Integer size;
            while (stillLoop) {
                if (i <= 2) {
                    list.add(executorService.submit(readerTask));
                }

                if (!list.isEmpty()) {
                    for (Future<Integer> f : list) {
                        if (f.isDone()) {
                            size = f.get();
                            randoms.add(size);
                            System.out.println("Process read with a size of "+ size);
                            list.remove(f);
                        } else {
                            // System.out.println("skipping");
                        }
                    }
                } else {
                    stillLoop = false;
                }
                i++;
            }
            System.out.println("at barrier waiting");
            this.barrier.awaitAdvance(0);
            System.out.println("barrier crossed");
            this.finished = true;
        } catch (Exception e1) {
            e1.printStackTrace();
        }
    }

    public boolean isFinished() {
        return this.finished;
    }
}

1 ответ

Результаты: i 0 i 1 i 2 выходит из цикла Чтение... Чтение... r 13 r 44 Чтение... r 78 Печать метрик

Я изменил ArrayList на Vector, поскольку ArrayList не является потокобезопасным, что в конечном итоге приведет к исключению ConcurrentModificationException.

Является ли вышеуказанный вывод тем, что вы ожидаете?

Другие вопросы по тегам