Понимание фазера в Java с примером
Я пытаюсь понять Фазер в Java. Я написал пример, который застрял заранее, ожидая прибытия других сторон.
Насколько я понимаю, Phaser используется как барьер для многократного использования потоков (в отличие от CountdownLatch, который нельзя использовать повторно) с барьерным действием (в отличие от Cyclicbarrier, который используется для совместного использования состояния, Phaser не должен совместно использовать состояние в барьерном действии). Поправь меня, если я ошибаюсь.
Итак, в моем примере я пытаюсь выполнить некоторый случайный код сложения и вычитания в каждом потоке после того, как определенное количество сторон / потоков достигнет барьера. Что я делаю неправильно?
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private final Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
Phaser phaser = new Phaser(1);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Registering...%s",threadName));
phaser.register();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}
3 ответа
Вы инициализируете свой Phaser со значением 1:
Phaser phaser = new Phaser(1);
Это означает, что ваш основной поток - это один из потоков, который вы ожидаете, но он никогда не вызывает Arri ().
Поскольку число ваших потоков фиксировано, вы должны инициализировать Phaser с номером потока и удалить вызовы register().
Проблема в том, что вы не можете позвонить phaser.register()
изнутри регистрируемой задачи. При использовании фазеров всегда следуйте этим двум правилам:
- Только зарегистрированные задачи могут регистрировать другие задачи. Это означает, что задача не может зарегистрироваться.
- Все зарегистрированные задачи должны быть отменены до завершения. Хорошей практикой является обертывание кода с помощью фазера вокруг
finally
блок, который отменяет регистрацию в конце (см. пример).
Вот ваша фиксированная программа (обратите внимание на строку, которая создает фазер):
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.*;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private final Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
// since we know beforehand how many tasks we have, initialize the
// number of participants in the constructor; other wise register
// *before* launching the task
Phaser phaser = new Phaser(THREAD_POOL_SIZE);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}
Вот рабочий код без phaser.register():
import static java.lang.String.*;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Phaser;
import java.util.stream.IntStream;
public class PhaserUsage implements Callable<String> {
private static final int THREAD_POOL_SIZE = 10;
private Phaser phaser;
private PhaserUsage(Phaser phaser) {
this.phaser = phaser;
}
public static void main(String a[]) {
ExecutorService execService = Executors.newFixedThreadPool(THREAD_POOL_SIZE);
CompletionService<String> completionService = new ExecutorCompletionService<>(execService);
Phaser phaser = new Phaser(1);
IntStream.range(0, THREAD_POOL_SIZE)
.forEach(nbr -> completionService.submit(new PhaserUsage(phaser)));
execService.shutdown();
try {
while (!execService.isTerminated()) {
String result = completionService.take().get();
System.out.println(format("Result is: %s", result));
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
}
}
@Override
public String call() {
String threadName = Thread.currentThread().getName();
System.out.println(format("Registering...%s",threadName));
//phaser.register();
System.out.println(format("Arrive and await advance...%s",threadName));
phaser.arriveAndAwaitAdvance(); // await all creation
int a = 0, b = 1;
Random random = new Random();
for (int i = 0; i < random.nextInt(10000000); i++) {
a = a + b;
b = a - b;
}
System.out.println(format("De-registering...%s",threadName));
phaser.arriveAndDeregister();
return format("Thread %s results: a = %s, b = %s", threadName, a, b);
}
}