Java: синхронизация потоков ExecutorService с CountDownLatch вызывает мертвую блокировку?

Я написал игру жизни для практики программирования. Есть 3 разных реализации генератора. Первый: один основной поток + N вложенных потоков, второй: SwingWorker + N вложенных потоков, третий: SwingWorker + ExecutorService. N - это число доступных процессоров или определенных пользователем. Первые две реализации работают нормально, с одним и несколькими потоками. Реализация с ExecutorServise работает нормально с одним потоком, но блокируется более чем с одним. Я перепробовал все, но не могу найти решение.

Вот код реализации рабочего процесса (второй):

    package example.generator;

    import javax.swing.SwingWorker;

    /**
     * AbstractGenerator implementation 2: SwingWorker + sub threads.
     * 
     * @author Dima
     */
    public final class WorldGenerator2 extends AbstractGenerator {


        /**
         * Constructor.
         * @param gamePanel The game panel
         */
        public WorldGenerator2() {
            super();
        }




        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startGenerationProcess()
         */
        @Override
        protected void startGenerationProcess() {
            final SwingWorker<Void, Void> worker = this.createWorker();
            worker.execute();
        }




        /**
         * Creates a swing worker for the generation process.
         * @return The swing worker
         */
        private SwingWorker<Void, Void> createWorker() {
            return new SwingWorker<Void, Void>() {

                @Override
                protected Void doInBackground() throws InterruptedException {
                    WorldGenerator2.this.generationProcessing();
                    return null;
                }
            };
        }





        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startFirstStep()
         */
        @Override
        public void startFirstStep() throws InterruptedException {
            this.getQueue().addAll(this.getLivingCells());
            for (int i = 0; i < this.getCoresToUse(); i++) {
                final Thread thread = new Thread() {
                    @Override
                    public void run() {
                        WorldGenerator2.this.fistStepProcessing();
                    }
                };
                thread.start();
                thread.join();
            }
        }




        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startSecondStep()
         */
        @Override
        protected void startSecondStep() throws InterruptedException {
            this.getQueue().addAll(this.getCellsToCheck());
            for (int i = 0; i < this.getCoresToUse(); i++) {
                final Thread thread = new Thread() {

                    @Override
                    public void run() {
                        WorldGenerator2.this.secondStepProcessing();
                    }
                };
                thread.start();
                thread.join();
            }
        }

    }

Вот код не работающей реализации со службой executor:

    package example.generator;

    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;

    import javax.swing.SwingWorker;

    /**
     * AbstractGenerator implementation 3: SwingWorker + ExecutorService.
     * 
     * @author Dima
     */
    public final class WorldGenerator3 extends AbstractGenerator {


        private CountDownLatch  countDownLatch;
        private ExecutorService executor;


        /**
         * Constructor.
         * @param gamePanel The game panel
         */
        public WorldGenerator3() {
            super();
        }




        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startGenerationProcess()
         */
        @Override
        protected void startGenerationProcess() {
            this.executor = Executors.newFixedThreadPool(this.getCoresToUse());
            final SwingWorker<Void, Void> worker = this.createWorker();
            worker.execute();
        }




        /**
         * Creates a swing worker for the generation process.
         * @return The swing worker
         */
        private SwingWorker<Void, Void> createWorker() {
            return new SwingWorker<Void, Void>() {

                @Override
                protected Void doInBackground() throws InterruptedException {
                    WorldGenerator3.this.generationProcessing();
                    return null;
                }
            };
        }




        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startFirstStep()
         */
        @Override
        public void startFirstStep() throws InterruptedException {
            this.getQueue().addAll(this.getLivingCells());
            this.countDownLatch = new CountDownLatch(this.getCoresToUse());
            for (int i = 0; i < this.getCoresToUse(); i++) {    
                this.executor.execute(new Runnable() {  
                    @Override
                    public void run() {
                        WorldGenerator3.this.fistStepProcessing();
                        WorldGenerator3.this.countDownLatch.countDown();
                    }
                });
            }
            this.countDownLatch.await();

        }




        /* (non-Javadoc)
         * @see main.generator.AbstractGenerator#startSecondStep()
         */
        @Override
        protected void startSecondStep() throws InterruptedException {
            this.getQueue().addAll(this.getCellsToCheck());
            this.countDownLatch = new CountDownLatch(this.getCoresToUse());
            for (int i = 0; i < this.getCoresToUse(); i++) {
                this.executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        WorldGenerator3.this.secondStepProcessing();
                        WorldGenerator3.this.countDownLatch.countDown();
                    }
                });
            }
            this.countDownLatch.await();

        }
    }

Здесь вы можете скачать образец моего приложения с небольшим лаунчером. он печатает только результат итерации на консоли: Ссылка


Теперь мой код выглядит так:

/* (non-Javadoc)
 * @see main.generator.AbstractGenerator#startFirstStep()
 */
@Override
public void startFirstStep() throws InterruptedException {

    this.getQueue().addAll(this.getLivingCells());      

    final ArrayList<Callable<Void>> list = new ArrayList<Callable<Void>>(this.getCoresToUse());

    for (int i = 0; i < this.getCoresToUse(); i++) {

        list.add(new Callable<Void>() {

                @Override
                public Void call() throws Exception {
                    WorldGenerator3.this.fistStepProcessing();
                    return null;
                }
            }
        );          
    }

    this.executor.invokeAll(list);
}

Но здесь опять та же проблема. Если я запускаю его с одним ядром (потоком), проблем не возникает. Если я установлю количество ядер более одного, оно будет заблокировано. В моем первом вопросе есть ссылка на пример, который вы можете запустить (в eclipse). Может быть, я упустил что-то в предыдущем коде.

2 ответа

Во всех вариантах вы выполняете потоки последовательно, а не параллельно, потому что вы join а также await внутри цикла. Это означает, что цикл for не может перейти к следующей итерации, пока только что завершившийся поток не завершится. Это означает, что в любой момент времени может существовать только один поток - либо основной, либо один поток, созданный в текущей итерации цикла. Если вы хотите присоединиться к нескольким потокам, вы должны собрать ссылки на них, а затем, за пределами цикла, где вы их все запустили, ввести другой цикл, в котором вы присоединитесь к каждому из них.

Что касается использования CountDownLatch в Executors вариант, то, что было сказано для потоков, здесь используется для защелки: не используйте экземпляр var; используйте локальный список, который собирает все защелки и ожидает их в отдельном цикле.

Но вы не должны использовать CountDownLatch во-первых: вы должны поместить все свои параллельные задачи в список Callableи позвоните ExecutorService.invokeAll с этим. Он будет автоматически блокироваться, пока все задачи не будут выполнены.

Я нахожу ваше использование услуг Executors немного странным...

Т.е. идея состоит в том, чтобы иметь Executor с пулом потоков, размер которых обычно связан с количеством ядер, поддерживаемых вашим процессором. Затем вы отправляете исполнителю любое количество параллельных задач, позволяя ему решать, что и когда выполнять, и какой доступный поток из его пула.

Что касается CountDownLatch... Почему бы не использовать ExecutorService.invokeAll? Этот метод будет блокировать, пока все представленные задачи не будут выполнены или не истечет время ожидания. Так что он будет делать подсчет работы, оставленной от вашего имени. Или CompletionService, который "отделяет производство новых асинхронных задач от потребления результатов выполненных задач", если вы хотите использовать результат Task, как только он станет доступным, т.е. не ждать, пока все задачи завершатся первыми.

Что-то вроде

    private static final int WORKER_THREAD_COUNT_DEFAULT = Runtime.getRuntime().availableProcessors() * 2;

    ExecutorService executor = Executors.newFixedThreadPool(WORKER_THREAD_COUNT);

    // your tasks may or may not return result so consuming invokeAll return value may not be necessary in your case 
    List<Future<T>> futuresResult = executor.invokeAll(tasksToRunInParallel, EXECUTE_TIMEOUT,
                TimeUnit.SECONDS);
Другие вопросы по тегам