Идеи, чтобы избежать трюка на CyclicBarrier

Я провел несколько тестов с параллельной обработкой и создал программу, которая с учетом матрицы целых чисел пересчитывает значение каждой позиции на основе соседей.

Мне нужна была копия матрицы, чтобы значения не были переопределены и использовались CyclicBarrier объединить результаты, когда частичные проблемы были решены:

CyclicBarrier cyclic_barrier = new CyclicBarrier(n_tasks + 1, new Runnable() {
    public void run() {
        ParallelProcess.mergeResult();
    }
});
ParallelProcess p = new ParallelProcess(cyclic_barrier, n_rows, r_cols); // init

Каждой задаче назначается часть матрицы: я делю ее на равные части по строкам. Но может случиться так, что деления не будут точными, поэтому будет небольшой фрагмент, соответствующий последнему ряду, который не будет передан в пул потоков.

Пример: если у меня есть 16 строки и n_tasks = 4 нет проблем, все 4 будут представлены в пул. Но если бы я 18 вместо этого будут представлены первые 16, но не последние два.

Поэтому я заставляю подчиниться, если это произойдет. Ну, на самом деле я не отправляю, потому что я использую фиксированный пул потоков, который я создал следующим образом ExecutorService e = Executors.newFixedThreadPool(n_tasks), Так как все слоты в пуле заняты и потоки заблокированы барьером (mybarrier.await() называется в run метод) Я не мог представить его в пул, поэтому я просто использовал Thread.start(),

Давайте перейдем к делу. Поскольку мне нужно принять во внимание CyclicBarrier вероятность того, что этот кусок остается, количество партий должно быть увеличено на 1.

Но если этого не произойдет, у меня не хватит одной стороны, чтобы вызвать барьер.

Какое у меня решение?:

if (lower_limit != n_rows) { // the remaining chunk to be processed
    Thread t = new Thread(new ParallelProcess(lower_limit, n_rows));
    t.start();
    t.join();
}
else {
    cyclic_barrier.await();
}

Я чувствую, что я обманываю при использовании cyclic_barrier.await() трюк, чтобы поднять барьер силой.

Есть ли другой способ решить эту проблему, чтобы мне не пришлось делать то, что я делаю?

1 ответ

Хотя это не отвечает на ваш вопрос о CyclicBarriers, могу ли я порекомендовать использовать Phaser? Это имеет возможность включать в себя количество сторон, а также позволяет запустить mergeResult когда фаза срабатывает.

Поэтому, прежде чем выполнять асинхронный расчет, просто register, Тогда внутри этого расчета поток прибудет на phaser, Когда все потоки прибывают, это продвинет фазу и может вызвать переопределенный метод onAdvance,

Представление:

ParallelProcess process = new ParallelProcess(lower_limit, n_rows));
phaser.register();
executor.submit(process);

Процессор

public void run(){
   //do stuff
   phaser.arrive();
}

Фазер

Phaser phaser = new Phaser(){
    protected boolean onAdvance(int phase, int registeredParties) {
        ParallelProcess.mergeResult(); 
        return true;
    }
}
Другие вопросы по тегам