Проблема множественного параллелизма с будущим объектом
Я пытаюсь порождать несколько потоков и помещать их в список по мере их выполнения. Когда они завершат свою обработку, я бы хотел собрать их результаты для презентации. Таким образом, у меня может быть список, содержащий много потоков, а затем, как только они станут доступны, я могу позвонить future.get
и использовать их информацию обратного вызова.
Почему-то мне не хватает многих результатов. Когда я перехожу через код, f.get()
проходит, когда это не должно быть, и я не могу понять, почему.
Мой код выглядит следующим образом:
public class ThreadTesterRunner {
static List<Integer> randoms = new ArrayList<>();
public static void main(String[] args) throws InterruptedException {
final Phaser cb = new Phaser();
ThreadRunner tr = new ThreadRunner(cb);
Thread t = new Thread(tr, "Thread Runner");
t.start();
boolean process = true;
// wait until all threads process, then print reports
while (process){
if(tr.isFinished()){
System.out.println("Print metrics");
process = false;
}
Thread.sleep(1000);
}
}
}
class ThreadRunner implements Runnable {
private ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Phaser barrier;
private boolean finished=false;
public ThreadRunner(Phaser phaser) {this.barrier = phaser;}
public void run(){
try {
List<Future<Integer>> list = new ArrayList<>();
boolean stillLoop = true; int i = 0;
final Phaser p = this.barrier;
Callable<Integer> task = new Callable<Integer>() {
public Integer call() throws Exception {
return new Reader().doRun(p);
}
};
List<Integer> randoms = new ArrayList<>();
Integer size;
while (stillLoop){
System.out.println("i "+i);
list.add(executorService.submit(task));
for(Future<Integer> f: list){
if(f.isDone()){
size = f.get();
System.out.println("size "+size);
list.remove(f);
} else {
// keep processing
}
}
if(i == 2){
System.out.println("breaking out of loop");
stillLoop = false;
}
i++;
}
this.barrier.awaitAdvance(0);
this.finished=true;
} catch (Exception e1) {
e1.printStackTrace();
}
}
public boolean isFinished(){
return this.finished;
}
}
class Reader {
private Phaser readBarrier;
private ExecutorService executorService = Executors.newFixedThreadPool(20);
public Reader() {
}
Random randomGenerator = new Random();
public Integer doRun(Phaser phaser) throws Exception {
phaser.register();
this.readBarrier = phaser;
System.out.println("Reading...");
int i;
int r = randomGenerator.nextInt(100);
System.out.println("r "+r);
ThreadTesterRunner.randoms.add(r);
int a = this.readBarrier.arrive();
return r; //i;
}
}
Есть идеи, почему это может происходить?
РЕДАКТИРОВАТЬ:
Хорошо, я думаю, что у меня все работает
class ThreadRunner implements Runnable {
// static int timeOutTime = 2;
private ExecutorService executorService = Executors.newFixedThreadPool(10);
private final Phaser barrier;
private boolean finished = false;
public ThreadRunner(Phaser phaser) {
this.barrier = phaser;
}
public void run() {
try {
List<Future<Integer>> list = new CopyOnWriteArrayList<>();
boolean stillLoop = true;
int i = 0;
final Phaser p = this.barrier;
Callable<Integer> readerTask = new Callable<Integer>() {
public Integer call() throws Exception {
return new Reader().doRun(p);
}
};
List<Integer> randoms = new ArrayList<>();
Integer size;
while (stillLoop) {
if (i <= 2) {
list.add(executorService.submit(readerTask));
}
if (!list.isEmpty()) {
for (Future<Integer> f : list) {
if (f.isDone()) {
size = f.get();
randoms.add(size);
System.out.println("Process read with a size of "+ size);
list.remove(f);
} else {
// System.out.println("skipping");
}
}
} else {
stillLoop = false;
}
i++;
}
System.out.println("at barrier waiting");
this.barrier.awaitAdvance(0);
System.out.println("barrier crossed");
this.finished = true;
} catch (Exception e1) {
e1.printStackTrace();
}
}
public boolean isFinished() {
return this.finished;
}
}
1 ответ
Результаты: i 0 i 1 i 2 выходит из цикла Чтение... Чтение... r 13 r 44 Чтение... r 78 Печать метрик
Я изменил ArrayList на Vector, поскольку ArrayList не является потокобезопасным, что в конечном итоге приведет к исключению ConcurrentModificationException.
Является ли вышеуказанный вывод тем, что вы ожидаете?