Использование пула потоков для добавления в список

Я пытаюсь прочитать файл и добавить каждую строку в список.

Простой рисунок, объясняющий цель

Основной класс -

public class SimpleTreadPoolMain {

  public static void main(String[] args) {
    ReadFile reader = new ReadFile();
    File file = new File("C:\\myFile.csv");
    try {
        reader.readFile(file);
    } catch (IOException e) {
        e.printStackTrace();
    }
  }
}

Читательский класс -

public class ReadFile {

ExecutorService executor = Executors.newFixedThreadPool(5);//creating a pool of 5 threads

List<String> list = new ArrayList<>();

void readFile(File file) throws IOException {
    try (BufferedReader br = new BufferedReader(new FileReader(file))) {
        String line;
        while ((line = br.readLine()) != "") {
            Runnable saver = new SaveToList(line,list);  
            executor.execute(saver);//calling execute method of ExecutorService 
        }
    }

    executor.shutdown();  
    while (!executor.isTerminated()) {   }  

}

}

Заставка класс -

public class SaveToList<E> implements Runnable{

List<E> myList;

E line;

public SaveToList(E line, List<E> list) {
    this.line = line;
    this.myList = list;
}

public void run() {
    //modify the line
    myList.add(line);

}
}

Я попытался добавить несколько потоков заставок для добавления в один и тот же список, а не одну заставку, добавляющую в список одну за другой. Я хочу использовать потоки, потому что мне нужно изменить данные перед добавлением в список. Поэтому я предполагаю, что изменение данных займет некоторое время. Так что параллелизм в этой части уменьшит потребление времени, верно?

Но это не работает. Я не могу вернуть глобальный список, который включает в себя все значения из файла. Я хочу иметь только один глобальный список значений из файла. Так что код определенно должен измениться. Если кто-то может вести меня, это будет очень цениться.

Хотя добавление одного за другим в один поток будет работать, использование пула потоков сделает это быстрее, верно?

2 ответа

Использование нескольких потоков ничего не ускорит здесь.

Вы:

  • Чтение строки из файла, поочередно.
  • Создание работоспособного и отправка его в пул потоков
  • Runnable затем добавляет вещи в список

Учитывая, что вы используете ArrayListвам нужно синхронизировать доступ к нему, потому что вы мутируете его из нескольких потоков. Итак, вы добавляете вещи в список поочередно.

Но даже без синхронизации время, затрачиваемое на ввод-вывод, будет намного превышать время, необходимое для добавления строки в список. А добавление в многопоточность просто еще больше замедлит его, потому что он выполняет работу по созданию исполняемого файла, его передаче в пул потоков, планированию и т. Д.

Проще просто пропустить весь средний шаг:

  • Чтение строки из файла, поочередно.
  • Добавить список в список, поочередно.

Так:

try (BufferedReader br = new BufferedReader(new FileReader(file))) {
    String line;
    while (!(line = br.readLine()).isEmpty()) {
        list.add(line);
    }
}

На самом деле вы должны попробовать, если в вашем приложении стоит использовать многопоточность, просто сравните, сколько времени требуется, чтобы прочитать весь файл без какой-либо обработки на выполненных строках, и сравните его со временем, которое требуется для последовательной обработки всего файла.

Если ваш процесс не слишком сложен, я думаю, что не стоит использовать многопоточность.

Если вы обнаружите, что для этого требуется гораздо больше времени, вы можете подумать об использовании одного или нескольких потоков для выполнения вычислений.

Если это так, вы могли бы использовать Futures для обработки пакетов входных строк или, может быть, вы можете использовать потокобезопасную очередь для отправки строки другому процессу.

private static final int BATCH_SIZE = 1000;

public static void main(String[] args) throws IOException {

    BufferedReader reader = new BufferedReader(new InputStreamReader(new FileInputStream("big_file.csv"), "utf-8"));


    ExecutorService pool = Executors.newFixedThreadPool(8);
    String line;
    List<String> batch = new ArrayList<>(BATCH_SIZE);
    List<Future> results = new LinkedList<>();
    while((line=reader.readLine())!=null){
        batch.add(line);
        if(batch.size()>=BATCH_SIZE){
            Future<Object> f = noWaitExec(batch, pool);
            results.add(f);
            batch = new ArrayList<>(BATCH_SIZE);
        }
    }
    Future<List> f = noWaitExec(batch,pool);
    results.add(f);

    for (Future future : results) {
        try {
            Object object = future.get();
            // Use your results here 
        } catch (Exception e) {
            // Manage this....
        }
    }


}
private static Future<List> noWaitExec(final List<String> batch, ExecutorService pool) {
    return pool.submit(new Callable<List>() {
        public List call() throws Exception {
            List result = new ArrayList<>(batch.size());
            for (String string : batch) {
                result.add(process(string));
            }
            return result;
        }

    });
}

private static Object process(String string) {
    // Your process .... 
    return null;
};

Есть много других возможных решений (Observables, ParallelStreams, Pipes, CompletableFutures ... вы называете это), но я все же думаю, что большую часть времени тратится на чтение файла, просто используя BufferedInputStream для чтения файла с достаточно большой буфер может сократить ваши времена больше, чем параллельные вычисления.

Другие вопросы по тегам