Как указать ForkJoinPool для Java 8 параллельный поток?
Как я знаю, параллельные потоки используют по умолчанию ForkJoinPool.commonPool
который по умолчанию имеет на один поток меньше, чем ваши процессоры. Я хочу использовать свой собственный пул потоков.
Как это:
@Test
public void stream() throws Exception {
//System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Lists.newArrayList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (Exception e) {
}
return item * 10;
})).get().collect(Collectors.toList());
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
}
Мой обычай ForkJoinPool
никогда не используется. И я изменяю параллелизм по умолчанию следующим образом:
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "20");
Это работает хорошо - задачи стоят всего около 1 секунды.
В моем приложении задача содержит тяжелую операцию ввода-вывода (чтение данных из базы данных). Поэтому мне нужен более высокий параллелизм, но я не хочу менять свойство JVM.
Итак, как правильно указать мой собственный ForkJoinPool
?
Или как использовать параллельные потоки в IO-интенсивной ситуации?
2 ответа
Потоки ленивы; Вся работа выполняется, когда вы начинаете работу терминала. В вашем случае, терминал работает .collect(Collectors.toList())
, который вы называете в main
поток на результат get()
, Поэтому фактическая работа будет выполняться так же, как если бы вы построили весь поток в main
нить.
Чтобы ваш пул имел эффект, вы должны переместить операцию терминала в отправленную задачу:
ForkJoinPool pool = new ForkJoinPool(10);
List<Integer> testList = Arrays.asList(
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20);
long start = System.currentTimeMillis();
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
}).collect(Collectors.toList())).join();
System.out.println(result);
System.out.println(System.currentTimeMillis() - start);
Мы также можем продемонстрировать актуальность работы терминала, построив поток в main
поток и только отправка операции терминала в пул:
Stream<Integer> stream = testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (InterruptedException e) {}
return item * 10;
});
List<Integer> result = pool.submit(() -> stream.collect(Collectors.toList())).join();
Но вы должны иметь в виду, что это недокументированное поведение, которое не гарантируется. Фактический ответ должен состоять в том, что Stream API в его текущей форме, без контроля потока (и без помощи для работы с проверенными исключениями), не подходит для параллельных операций ввода-вывода.
Я полагаю, вы обнаружили трюк, описанный здесь:
в котором говорится
Хитрость основана на
ForkJoinTask.fork
в котором указано: "Обеспечивает асинхронное выполнение этой задачи в пуле, в котором выполняется текущая задача, если это применимо, или с использованиемForkJoinPool.commonPool()
если неinForkJoinPool()
"
В вашем коде parallelStream()
а также map(...)
вызываются в обычае ForkJoinPool
, но Function
перешел к map
не является.
Помни что Stream#map
это промежуточная операция. это Function
будет выполняться для его элемента только после того, как терминальная операция будет связана. В вашем случае эта операция терминала collect(...)
, И с тех пор collect(Collectors.toList()
вызывается в main
нить map
"s Function
вызывается на каждом элементе параллельно в commonPool
,
Вы можете просто переместить collect(...)
позвоните в свой submit(...)
,
List<Integer> result = pool.submit(() -> testList.parallelStream().map(item -> {
try {
// read from database
Thread.sleep(1000);
System.out.println("task" + item + ":" + Thread.currentThread());
} catch (Exception e) {
}
return item * 10;
}).collect(Collectors.toList())).get();