Java Fork-Join не работает с большим ArrayList

Я довольно плохо знаком с параллелизмом и параллелизмом и пытаюсь реализовать алгоритм медианного фильтра, используя Fork-Join в Java. По сути, я считываю входной файл в ArrayList и использую этот список для генерации нового ArrayList отфильтрованных медиан (включая первый и последний элемент исходного ArrayList).

Теперь мне удалось сделать последовательную / последовательную версию алгоритма, и она отлично работает. Однако когда я попытался создать версию Fork-Join, она не работала для больших списков ArrayLists(100000+). Я попробовал это с очень маленьким ArrayList размера 5, и он отлично работает. Кажется, я не могу найти свою ошибку (я уверен, что это логическая ошибка и / или ошибка реализации). Любая помощь будет оценена.

Вот фрагмент последовательного алгоритма:

    //Add first boundary element to output ArrayList
    outputElements.add(this.elements.get(0));

    //Start Filter Algorithm 
    while(elements.size()-counter >= filterSize){
        for(int i = 0; i<filterSize; i++){
            tempElements.add(this.elements.get(i+counter));
            if(i==filterSize){
                break;
            }
        }

        Collections.sort(tempElements);
        outputElements.add(tempElements.get((filterSize-1)/2));

        counter++;
        tempElements.clear();
    }

    //Add last boundary element to output ArrayList.
    if (elements != null && !elements.isEmpty()) {
        outputElements.add(elements.get(elements.size()-1));
    }//End Filter Algorithm

Вот параллельный класс, который я сделал. Это часть, которая не работает:

public class Parallel extends RecursiveTask<List<Float>>{
    int lo;
    int hi;
    int filterSize;
    String outFile; //Output file name.
    int arraySize;
    List<Float> elements = new ArrayList<Float>();
    List<Float> tempElements = new ArrayList<Float>();
    List<Float> outputElements = new ArrayList<Float>();
    int counter = 0;
    static final int SEQUENTIAL_CUTOFF=1000;

    public Parallel(List<Float> elements, int filterSize, String outFile, int lo, int hi) {
        this.lo = lo;
        this.hi = hi;
        this.elements = elements;
        this.outFile = outFile;
        this.filterSize = filterSize;       
        if(lo == 0){
            outputElements.add(this.elements.get(0));
        }
    }
    @Override
    protected List<Float> compute() {
        long startTime = System.nanoTime(); //Algorithm starts here 
        if((hi-lo) < SEQUENTIAL_CUTOFF) {
            while(hi-counter >= filterSize){
                for(int i = lo; i<filterSize; i++){
                    tempElements.add(this.elements.get(i+counter));
                    if(i==filterSize){
                        break;
                    }
                }               
                Collections.sort(tempElements);
                outputElements.add(tempElements.get((filterSize-1)/2));
                counter++;
                tempElements.clear();
                return outputElements;
            }
          }else{              
              Parallel left = new Parallel(this.elements, this.filterSize, this.outFile, this.lo, ((this.lo + this.hi)/2));
              Parallel right = new Parallel(this.elements, this.filterSize, this.outFile, ((this.hi + this.lo)/2), this.hi);
              left.fork();

              List<Float> leftArr = new ArrayList<Float>();
              List<Float> rightArr = new ArrayList<Float>();

             rightArr =  right.compute();
             leftArr = left.join();

             List<Float> newList = new ArrayList<Float>();
             newList.addAll(leftArr);
             newList.addAll(rightArr);       

          }
        long endTime = System.nanoTime();//Algorithm ends here.

        //Write elements to output file 
        PrintWriter writeOutput = null;
        try {
            writeOutput = new PrintWriter(this.outFile, "UTF-8");
        } catch (FileNotFoundException | UnsupportedEncodingException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        writeOutput.println(outputElements.size());//Number of lines
        for(int i=0; i<outputElements.size();i++){
            writeOutput.println(i+1 + " " + outputElements.get(i)); //Each line is written
        }

        writeOutput.close(); //Close when output finished writing.
        System.out.println("Parallel complete");
        return null;
    }
}

Любая помощь очень ценится. Я не могу понять это сразу, потратив несколько часов на изучение SO и Google.

Редактировать: music_coder предложил опубликовать ошибки, с которыми я сталкиваюсь, и вот они. Это много ошибок:

Exception in thread "main" java.lang.IndexOutOfBoundsException
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
    at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:536)
    at java.util.concurrent.ForkJoinTask.reportResult(ForkJoinTask.java:596)
    at java.util.concurrent.ForkJoinTask.join(ForkJoinTask.java:640)
    at java.util.concurrent.ForkJoinPool.invoke(ForkJoinPool.java:1521)
    at main.main(main.java:45)
Caused by: java.lang.IndexOutOfBoundsException: Index: 1, Size: 0
    at java.util.ArrayList.rangeCheck(ArrayList.java:635)
    at java.util.ArrayList.get(ArrayList.java:411)
    at Parallel.compute(Parallel.java:44)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:57)
    at Parallel.compute(Parallel.java:1)
    at java.util.concurrent.RecursiveTask.exec(RecursiveTask.java:93)
    at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:334)
    at java.util.concurrent.ForkJoinWorkerThread.execTask(ForkJoinWorkerThread.java:604)
    at java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:784)
    at java.util.concurrent.ForkJoinPool.work(ForkJoinPool.java:646)
    at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:398)

1 ответ

В общем, вы должны избегать использования ArrayList s в многопоточном коде, потому что он не является потокобезопасным:

Обратите внимание, что эта реализация не синхронизирована. Если несколько потоков обращаются к ArrayList Например, если хотя бы один из потоков изменяет список структурно, он должен быть синхронизирован извне.

Я не вижу в опубликованных вами фрагментах ничего, что изменяло бы списки одновременно, но я вижу, что вы выделили this.elements ребенку Parallel экземпляры, означающие, как минимум, что вы делаете что-то рискованное (разделение указателей на изменяемые, не поточечные объекты между потоками)

В качестве первого прохода замените this.elements = elements; в вашем Parallel конструктор со следующим:

this.elements = Collections.unmodifiableList(elements);

Делая список неизменяемым, вы убедитесь, что если ваш Parallel Код пытается изменить список, вы получите явную ошибку прямо в точке сбоя. Это не мешает чему-то другому, снаружи Parallel от изменения оригинала elements список, но это быстрый и простой способ проверить Parallel ведет себя правильно. Если вы получите UnsupportedOperationException, ваш Parallel Класс должен быть изменен - ​​вы не можете изменить ArrayList одновременно.

Если вы не получите UnsupportedOperationException что-то еще изменяет ваш список. Вам нужно найти это и удалить его.


После того, как вы выяснили, что вызывает одновременное изменение вашего списка, вы можете попытаться определить лучший путь вперед. Прохождение всех "правильных" способов обмена данными между потоками выходит за рамки того, что я могу надеяться охватить в этом ответе, но вот несколько общих правил:

  • Избегайте изменяемых структур данных - создайте свой Parallel класс для обработки данных только из неизменяемых структур данных, таких как Guava ImmutableList, Неизменяемые структуры данных по умолчанию являются потокобезопасными.
  • Используйте поточно-ориентированные структуры данных - например, ConcurrentLinkedQueue это потокобезопасный способ чтения и записи в одну и ту же структуру данных несколькими процессами. ConcurrentHashMap еще один часто используемый класс. То, что вам нужно, во многом зависит от того, что вы пытаетесь сделать, но это хорошая отправная точка.
  • Минимизируйте объем ваших параллельных операций - даже при одновременных структурах данных ваша общая цель должна состоять в том, чтобы каждая задача выполнялась изолированно, за исключением чтения из общего источника и записи в общий приемник. Делайте как можно больше работы с объектами, которые видны только одному потоку.
  • Синхронизировать - я заметил, что Parallel пишет outFile без какой-либо явной синхронизации. Это опасно и может привести к проблемам (сбоям или ухудшению повреждения данных). Только один поток должен когда-либо записывать в файл одновременно. Сделайте это либо с помощью специального потока для записи файлов, либо путем явной синхронизации ваших операций записи файлов.
Другие вопросы по тегам