ThreadPoolExecutor непредвиденная ошибка
Сложите ниже простую тестовую программу, которая должна параллельно выполнять некоторые задачи. Каждый раз мы отправляем 6 заданий и ждем завершения. Затем представлен другой набор задач.
import java.util.concurrent.*;
public class ThreadExecutorTest {
public static void main(String... args) {
ThreadPoolExecutor ex = new ThreadPoolExecutor( 15, 20, 10,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(5));
for (int i = 0; i< 200; i++) {
submitTasks(ex);
}
System.out.println("Done");
}
private static void submitTasks(ThreadPoolExecutor ex) {
Future f1 = ex.submit( new SampleTask());
Future f2 = ex.submit( new SampleTask());
Future f3 = ex.submit( new SampleTask());
Future f4 = ex.submit( new SampleTask());
Future f5 = ex.submit( new SampleTask());
Future f6 = ex.submit( new SampleTask());
// System.out.println("Max Pool Size " + ex.getMaximumPoolSize());
System.out.println("Pool Size " + ex.getPoolSize());
// System.out.println("Active count " + ex.getActiveCount());
// System.out.println("Task Count " + ex.getTaskCount());
// System.out.println("Queue length " + ex.getQueue().size());
// System.out.println("Queue remainingCapacity " + ((ArrayBlockingQueue)ex.getQueue()).remainingCapacity());
try {
f1.get();
} catch (ExecutionException eex) {
System.out.println("ExecutionException reported later - " + eex.getMessage());
}catch(Exception exp){
System.out.println("Exception reported later - " + exp.getMessage());
}
try{
f2.get();
}catch(Exception exp){}
try{
f3.get();
}catch(Exception exp){}
try{
f4.get();
}catch(Exception exp){}
try{
f5.get();
}catch(Exception exp){}
try{
f6.get();
}catch(Exception exp){}
}
static class SampleTask implements Callable<Void> {
@Override
public Void call() throws Exception {
try {
// Thread.sleep(300);
} catch (Exception e) {
System.out.println("Exception reported");
}
return null;
}
}
}
Но возникло следующее исключение, которое я не могу объяснить. Я предположил, что конфигурация ThreadPoolExecutor корректна для обработки 6 задач в любой момент.
Pool Size 6
Pool Size 12
Pool Size 15
Pool Size 16
Pool Size 17
Pool Size 18
Pool Size 19
Pool Size 20
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2328c243 rejected from java.util.concurrent.ThreadPoolExecutor@bebdb06[Running, pool size = 20, active threads = 0, queued tasks = 0, completed tasks = 53]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)
1 ответ
ThreadPoolExecutor.execute
есть комментарий, описывающий, как он ведет себя при отправке новой задачи:
/* * Proceed in 3 steps: * * 1. If fewer than corePoolSize threads are running, try to * start a new thread with the given command as its first * task. The call to addWorker atomically checks runState and * workerCount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. If a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. So we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. If we cannot queue task, then we try to add a new * thread. If it fails, we know we are shut down or saturated * and so reject the task. */
В вашем случае, когда вы отправляете пакеты по 6 задач одновременно, когда текущий размер пула меньше размера ядра, эти представления немедленно отправляются новым рабочим потокам (см. Переходы от 0 до 6 и от 6 до 12).
После того, как вы превысили размер основного пула, но все еще меньше максимального размера, задачи отправляются в очередь, а затем выполняются асинхронно для запуска в существующем рабочем потоке, пока очередь не заполнена. Поскольку все эти задачи отправляются последовательно, существует высокая вероятность того, что все шесть будут отправлены до того, как кто-либо будет удален из очереди; таким образом, первые пять будут поставлены в очередь, а оставшиеся - перейдут к шагу 3 описанного выше процесса: создается новый рабочий поток, и эта задача запускается немедленно. (Это объясняет более поздние скачки с 15 до 16, с 16 до 17 и т. Д.)
В конечном итоге это приводит к тому, что пул потоков имеет максимальное количество рабочих потоков, и когда выполняется шаг 3 вышеуказанного процесса (как в последнем абзаце), Исполнитель не может создать нового работника и отклоняет задачу. По сути, даже несмотря на то, что существуют доступные рабочие потоки, вы не дали исполнителю времени вытянуть задачи из очереди для их выполнения, прежде чем переполнять очередь.