Многопоточность Java - подача газа в ExecutorService
У меня есть файл данных с тысячами строк. Я читаю их и сохраняю в базе данных. Я хочу многопоточность этого процесса в пакетах, скажем, 50 строк. Как я прочитал в файле, 10 строк передаются в ExecutorService.
ExecutorService executor = Executors.newFixedThreadPool(5);`
Я могу сделать ниже в цикле в то время как мои строки заканчиваются....
Future<Integer> future = executor.submit(callableObjectThatSaves10RowsAtOneTime);
Но я не хочу читать весь файл в память, если обработка 10 строк занимает время. Я хочу отправить только 5, пока не вернется один из потоков, а затем отправлю следующий.
Допустим, поток занимает 20 секунд, чтобы сохранить 10 записей, я не хочу ExecutorService
получать тысячи строк, поскольку процесс чтения продолжает читать и отправлять в ExecutorService
Каков наилучший способ достичь этого?
1 ответ
Вы можете сделать это с LinkedList<Future<?>>
в котором хранятся фьючерсы, пока вы не достигнете заранее определенного размера. Вот некоторый скелетный код, который поможет вам пройти большую часть пути:
int threads = 5;
ExecutorService service = Executors.newFixedThreadPool(threads);
LinkedList<Future<?>> futures = new LinkedList<>();
//As long as there are rows to save:
while(moreRowsLeft()){
//dump another callable onto the queue:
futures.addLast(service.submit(new RowSavingCallable());
//if the queue is "full", wait for the next one to finish before
//reading any more of the file:
while(futures.size() >= 2*threads) futures.removeFirst().get();
}
//All rows have been submitted but some may still be writing to the DB:
for(Future<?> f : futures) future.get();
//All rows have been saved at this point
Вы можете задаться вопросом, почему я позволил числу фьючерсов в два раза увеличить количество потоков на машине - это позволяет потокам службы исполнителя работать над сохранением базы данных, в то время как основной поток создает больше работы. Это может помочь скрыть любые затраты ввода-вывода, связанные с предоставлением большего количества вызываемых объектов для обработки, пока рабочие потоки заняты записью в базу данных.