Java зацикливание потоков с использованием CyclicBarrier
У меня есть программа с такой общей структурой:
init
create CyclicBarrier
initialise all threads, attaching to barrier
*start all threads*
wait for join
display stats
*start all threads*
perform calculation
await barrier
Моя проблема в том, что мне нужен метод run() потоков, чтобы продолжать цикл до тех пор, пока не будет выполнено определенное условие, но делать паузу после каждой итерации, чтобы все потоки синхронизировались.
Я уже пытался прикрепить метод Runnable к барьеру, но в итоге требуется пересоздание и перезапуск каждого потока, что не очень хорошее решение.
Я также пытался использовать метод reset() CyclicBarrier, но это, похоже, вызывает ошибки в существующих потоках, даже если они выполняются после завершения всех потоков.
Мой вопрос:
-Можно ли "сбросить" барьер и заставить все потоки барьера следовать тем же условиям, которые они выполняли до первых вызовов await()?
Или есть другой метод, который я должен использовать для достижения этой цели?
заранее спасибо
2 ответа
После ответа @Totoro ниже приведен небольшой пример кода, который также включает в себя требование "Мне нужен метод потоков (), чтобы цикл продолжался до тех пор, пока не будет выполнено определенное условие, и после каждой итерации приостанавливается, чтобы все потоки синхронизировались". Это делает его довольно быстрым, но, надеюсь, выходные данные программы прояснят пример кода (или я просто должен сделать лучшие примеры).
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class BarrierCalc implements Runnable {
public static final int CALC_THREADS = 3;
private static final AtomicBoolean runCondition = new AtomicBoolean();
private static final AtomicBoolean stopRunning = new AtomicBoolean();
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(CALC_THREADS + 1);
for (int i = 0; i < CALC_THREADS; i++) {
new Thread(new BarrierCalc(barrier)).start();
}
try {
runCondition.set(true);
barrier.await();
showln(0, "STATS!");
barrier.await();
showln(0, "start looping 1");
Thread.sleep(200);
runCondition.set(false);
showln(0, "stop looping 1");
barrier.await();
runCondition.set(true);
barrier.await();
showln(0, "start looping 2");
Thread.sleep(100);
runCondition.set(false);
showln(0, "stop looping 2");
barrier.await();
stopRunning.set(true);
showln(0, "finishing");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}
private static final AtomicInteger calcId = new AtomicInteger();
private CyclicBarrier barrier;
private int id;
public BarrierCalc(CyclicBarrier barrier) {
this.barrier = barrier;
id = calcId.incrementAndGet();
}
public void run() {
showln(id, "waiting for start");
try {
barrier.await(); // display stats
barrier.await(); // start running
int loopNumber = 0;
while (!stopRunning.get()) {
showln(id, "looping " + (++loopNumber));
while (runCondition.get()) {
Thread.sleep(10); // simulate looping
}
showln(id, "synchronizing " + loopNumber);
barrier.await();
showln(id, "synchronized " + loopNumber);
// give main thread a chance to set stopCondition and runCondition
barrier.await();
}
showln(id, "finished");
} catch (Exception e) {
e.printStackTrace();
}
}
private static final long START_TIME = System.currentTimeMillis();
public static void showln(int id, String msg) {
System.out.println((System.currentTimeMillis() - START_TIME) + "\t ID " + id + ": " + msg);
}
}
Помните, что выходные данные программы могут не соответствовать ожидаемому порядку: потокам, которые одновременно записывают данные на один синхронизированный выход (System.out), предоставляется доступ на запись в произвольном порядке.
Barrier.wait() приостановит потоки. Барьер уже находится в главном потоке, ему не нужно другого. В вашем алгоритме выше вы показываете перезапуск потоков после отображения статистики. Вам не нужно этого делать. Если недавно пробудившиеся потоки находятся в цикле, они снова вернутся в барьер.
Вы можете взглянуть на мой пример, где я играл с CyclicBarrier. Здесь каждый воркер производит какие-то вычисления, а на барьере проверяется условие. Если это соответствует условию, все рабочие прекращают вычисления, в противном случае они продолжают:
class Solver {
private static final int REQUIRED_AMOUNT = 100;
private static final int NUMBER_OF_THREADS = 4;
AtomicInteger atomicInteger = new AtomicInteger();
AtomicBoolean continueCalculation = new AtomicBoolean(true);
final CyclicBarrier barrier;
public static void main(String[] args) {
new Solver();
}
class Worker implements Runnable {
int workerId;
Worker(int workerId) {
this.workerId = workerId;
}
public void run() {
try {
while(continueCalculation.get()) {
calculate(workerId);
barrier.await();
}
} catch (Exception ex) {
System.out.println("Finishing " + workerId);
}
}
}
public Solver() {
Runnable barrierAction = () -> {
if (done()) {
continueCalculation.set(false);
}
};
barrier = new CyclicBarrier(NUMBER_OF_THREADS, barrierAction);
List<Thread> threads = new ArrayList(NUMBER_OF_THREADS);
for (int i = 0; i < NUMBER_OF_THREADS; i++) {
Thread thread = new Thread(new Worker(i));
threads.add(thread);
thread.start();
}
}
private void calculate(int workerId) throws InterruptedException {
// Some long-running calculation
Thread.sleep(2000L);
int r = new Random().nextInt(12);
System.out.println("Worker #" + workerId + " added " + r +" = " + atomicInteger.addAndGet(r));
}
private boolean done() {
int currentResult = atomicInteger.get();
boolean collected = currentResult >= REQUIRED_AMOUNT;
System.out.println("=======================================================");
System.out.println("Checking state at the barrier: " + currentResult);
if (collected) {
System.out.println("Required result is reached");
}
System.out.println("=======================================================");
return collected;
}
}