Идеи, чтобы избежать трюка на CyclicBarrier
Я провел несколько тестов с параллельной обработкой и создал программу, которая с учетом матрицы целых чисел пересчитывает значение каждой позиции на основе соседей.
Мне нужна была копия матрицы, чтобы значения не были переопределены и использовались CyclicBarrier
объединить результаты, когда частичные проблемы были решены:
CyclicBarrier cyclic_barrier = new CyclicBarrier(n_tasks + 1, new Runnable() {
public void run() {
ParallelProcess.mergeResult();
}
});
ParallelProcess p = new ParallelProcess(cyclic_barrier, n_rows, r_cols); // init
Каждой задаче назначается часть матрицы: я делю ее на равные части по строкам. Но может случиться так, что деления не будут точными, поэтому будет небольшой фрагмент, соответствующий последнему ряду, который не будет передан в пул потоков.
Пример: если у меня есть 16
строки и n_tasks = 4
нет проблем, все 4 будут представлены в пул. Но если бы я 18
вместо этого будут представлены первые 16, но не последние два.
Поэтому я заставляю подчиниться, если это произойдет. Ну, на самом деле я не отправляю, потому что я использую фиксированный пул потоков, который я создал следующим образом ExecutorService e = Executors.newFixedThreadPool(n_tasks)
, Так как все слоты в пуле заняты и потоки заблокированы барьером (mybarrier.await()
называется в run
метод) Я не мог представить его в пул, поэтому я просто использовал Thread.start()
,
Давайте перейдем к делу. Поскольку мне нужно принять во внимание CyclicBarrier
вероятность того, что этот кусок остается, количество партий должно быть увеличено на 1.
Но если этого не произойдет, у меня не хватит одной стороны, чтобы вызвать барьер.
Какое у меня решение?:
if (lower_limit != n_rows) { // the remaining chunk to be processed
Thread t = new Thread(new ParallelProcess(lower_limit, n_rows));
t.start();
t.join();
}
else {
cyclic_barrier.await();
}
Я чувствую, что я обманываю при использовании cyclic_barrier.await()
трюк, чтобы поднять барьер силой.
Есть ли другой способ решить эту проблему, чтобы мне не пришлось делать то, что я делаю?
1 ответ
Хотя это не отвечает на ваш вопрос о CyclicBarriers, могу ли я порекомендовать использовать Phaser? Это имеет возможность включать в себя количество сторон, а также позволяет запустить mergeResult
когда фаза срабатывает.
Поэтому, прежде чем выполнять асинхронный расчет, просто register
, Тогда внутри этого расчета поток прибудет на phaser
, Когда все потоки прибывают, это продвинет фазу и может вызвать переопределенный метод onAdvance
,
Представление:
ParallelProcess process = new ParallelProcess(lower_limit, n_rows));
phaser.register();
executor.submit(process);
Процессор
public void run(){
//do stuff
phaser.arrive();
}
Фазер
Phaser phaser = new Phaser(){
protected boolean onAdvance(int phase, int registeredParties) {
ParallelProcess.mergeResult();
return true;
}
}