Java Fork-Join не работает с большим ArrayList
Я довольно плохо знаком с параллелизмом и параллелизмом и пытаюсь реализовать алгоритм медианного фильтра, используя Fork-Join в Java. По сути, я считываю входной файл в ArrayList и использую этот список для генерации нового ArrayList отфильтрованных медиан (включая первый и последний элемент исходного ArrayList).
Теперь мне удалось сделать последовательную / последовательную версию алгоритма, и она отлично работает. Однако когда я попытался создать версию Fork-Join, она не работала для больших списков ArrayLists(100000+). Я попробовал это с очень маленьким ArrayList размера 5, и он отлично работает. Кажется, я не могу найти свою ошибку (я уверен, что это логическая ошибка и / или ошибка реализации). Любая помощь будет оценена.
Вот фрагмент последовательного алгоритма:
//Add first boundary element to output ArrayList
outputElements.add(this.elements.get(0));
//Start Filter Algorithm
while(elements.size()-counter >= filterSize){
for(int i = 0; i<filterSize; i++){
tempElements.add(this.elements.get(i+counter));
if(i==filterSize){
break;
}
}
Collections.sort(tempElements);
outputElements.add(tempElements.get((filterSize-1)/2));
counter++;
tempElements.clear();
}
//Add last boundary element to output ArrayList.
if (elements != null && !elements.isEmpty()) {
outputElements.add(elements.get(elements.size()-1));
}//End Filter Algorithm
Вот параллельный класс, который я сделал. Это часть, которая не работает:
public class Parallel extends RecursiveTask<List<Float>>{
int lo;
int hi;
int filterSize;
String outFile; //Output file name.
int arraySize;
List<Float> elements = new ArrayList<Float>();
List<Float> tempElements = new ArrayList<Float>();
List<Float> outputElements = new ArrayList<Float>();
int counter = 0;
static final int SEQUENTIAL_CUTOFF=1000;
public Parallel(List<Float> elements, int filterSize, String outFile, int lo, int hi) {
this.lo = lo;
this.hi = hi;
this.elements = elements;
this.outFile = outFile;
this.filterSize = filterSize;
if(lo == 0){
outputElements.add(this.elements.get(0));
}
}
@Override
protected List<Float> compute() {
long startTime = System.nanoTime(); //Algorithm starts here
if((hi-lo) < SEQUENTIAL_CUTOFF) {
while(hi-counter >= filterSize){
for(int i = lo; i<filterSize; i++){
tempElements.add(this.elements.get(i+counter));
if(i==filterSize){
break;
}
}
Collections.sort(tempElements);
outputElements.add(tempElements.get((filterSize-1)/2));
counter++;
tempElements.clear();
return outputElements;
}
}else{
Parallel left = new Parallel(this.elements, this.filterSize, this.outFile, this.lo, ((this.lo + this.hi)/2));
Parallel right = new Parallel(this.elements, this.filterSize, this.outFile, ((this.hi + this.lo)/2), this.hi);
left.fork();
List<Float> leftArr = new ArrayList<Float>();
List<Float> rightArr = new ArrayList<Float>();
rightArr = right.compute();
leftArr = left.join();
List<Float> newList = new ArrayList<Float>();
newList.addAll(leftArr);
newList.addAll(rightArr);
}
long endTime = System.nanoTime();//Algorithm ends here.
//Write elements to output file
PrintWriter writeOutput = null;
try {
writeOutput = new PrintWriter(this.outFile, "UTF-8");
} catch (FileNotFoundException | UnsupportedEncodingException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
writeOutput.println(outputElements.size());//Number of lines
for(int i=0; i<outputElements.size();i++){
writeOutput.println(i+1 + " " + outputElements.get(i)); //Each line is written
}
writeOutput.close(); //Close when output finished writing.
System.out.println("Parallel complete");
return null;
}
}
Любая помощь очень ценится. Я не могу понять это сразу, потратив несколько часов на изучение SO и Google.
Редактировать: music_coder предложил опубликовать ошибки, с которыми я сталкиваюсь, и вот они. Это много ошибок:
Exception in thread "main" java.lang.IndexOutOfBoundsException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536)
at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596)
at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640)
at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:1521)
at main.main(main.java:45)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0
at java.util.ArrayList.rangeCheck(ArrayList.java:635)
at java.util.ArrayList.get(ArrayList.java:411)
at Parallel.compute(Parallel.java:44)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:57)
at Parallel.compute(Parallel.java:1)
at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:93)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)
1 ответ
В общем, вы должны избегать использования ArrayList
s в многопоточном коде, потому что он не является потокобезопасным:
Обратите внимание, что эта реализация не синхронизирована. Если несколько потоков обращаются к
ArrayList
Например, если хотя бы один из потоков изменяет список структурно, он должен быть синхронизирован извне.
Я не вижу в опубликованных вами фрагментах ничего, что изменяло бы списки одновременно, но я вижу, что вы выделили this.elements
ребенку Parallel
экземпляры, означающие, как минимум, что вы делаете что-то рискованное (разделение указателей на изменяемые, не поточечные объекты между потоками)
В качестве первого прохода замените this.elements = elements;
в вашем Parallel
конструктор со следующим:
this.elements = Collections.unmodifiableList(elements);
Делая список неизменяемым, вы убедитесь, что если ваш Parallel
Код пытается изменить список, вы получите явную ошибку прямо в точке сбоя. Это не мешает чему-то другому, снаружи Parallel
от изменения оригинала elements
список, но это быстрый и простой способ проверить Parallel
ведет себя правильно. Если вы получите UnsupportedOperationException
, ваш Parallel
Класс должен быть изменен - вы не можете изменить ArrayList
одновременно.
Если вы не получите UnsupportedOperationException
что-то еще изменяет ваш список. Вам нужно найти это и удалить его.
После того, как вы выяснили, что вызывает одновременное изменение вашего списка, вы можете попытаться определить лучший путь вперед. Прохождение всех "правильных" способов обмена данными между потоками выходит за рамки того, что я могу надеяться охватить в этом ответе, но вот несколько общих правил:
- Избегайте изменяемых структур данных - создайте свой
Parallel
класс для обработки данных только из неизменяемых структур данных, таких как GuavaImmutableList
, Неизменяемые структуры данных по умолчанию являются потокобезопасными. - Используйте поточно-ориентированные структуры данных - например,
ConcurrentLinkedQueue
это потокобезопасный способ чтения и записи в одну и ту же структуру данных несколькими процессами.ConcurrentHashMap
еще один часто используемый класс. То, что вам нужно, во многом зависит от того, что вы пытаетесь сделать, но это хорошая отправная точка. - Минимизируйте объем ваших параллельных операций - даже при одновременных структурах данных ваша общая цель должна состоять в том, чтобы каждая задача выполнялась изолированно, за исключением чтения из общего источника и записи в общий приемник. Делайте как можно больше работы с объектами, которые видны только одному потоку.
- Синхронизировать - я заметил, что
Parallel
пишетoutFile
без какой-либо явной синхронизации. Это опасно и может привести к проблемам (сбоям или ухудшению повреждения данных). Только один поток должен когда-либо записывать в файл одновременно. Сделайте это либо с помощью специального потока для записи файлов, либо путем явной синхронизации ваших операций записи файлов.