ThreadPoolExecutor непредвиденная ошибка

Сложите ниже простую тестовую программу, которая должна параллельно выполнять некоторые задачи. Каждый раз мы отправляем 6 заданий и ждем завершения. Затем представлен другой набор задач.

 import java.util.concurrent.*;

public class ThreadExecutorTest {

  public static void main(String... args) {
    ThreadPoolExecutor ex   = new ThreadPoolExecutor(   15, 20, 10,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<Runnable>(5));

    for (int i = 0; i< 200; i++) {
      submitTasks(ex);
    }
    System.out.println("Done");
  }

  private static void submitTasks(ThreadPoolExecutor ex) {
    Future f1 = ex.submit( new SampleTask());
    Future f2 = ex.submit( new SampleTask());
    Future f3 = ex.submit( new SampleTask());
    Future f4 = ex.submit( new SampleTask());
    Future f5 = ex.submit( new SampleTask());
    Future f6 = ex.submit( new SampleTask());

//    System.out.println("Max Pool Size " + ex.getMaximumPoolSize());
    System.out.println("Pool Size " + ex.getPoolSize());
//    System.out.println("Active count " + ex.getActiveCount());
//    System.out.println("Task Count " + ex.getTaskCount());
//    System.out.println("Queue length " + ex.getQueue().size());
//    System.out.println("Queue remainingCapacity " + ((ArrayBlockingQueue)ex.getQueue()).remainingCapacity());

    try {
      f1.get();
    } catch (ExecutionException eex) {
      System.out.println("ExecutionException reported later - " + eex.getMessage());
    }catch(Exception exp){
      System.out.println("Exception reported later - " + exp.getMessage());
    }
    try{
      f2.get();
    }catch(Exception exp){}
    try{
      f3.get();
    }catch(Exception exp){}
    try{
      f4.get();
    }catch(Exception exp){}
    try{
      f5.get();
    }catch(Exception exp){}
    try{
      f6.get();
    }catch(Exception exp){}

  }

  static class SampleTask implements Callable<Void> {
    @Override
    public Void call() throws Exception {
      try {
//        Thread.sleep(300);
      } catch (Exception e) {
        System.out.println("Exception reported");
      }
      return null;
    }
  }
}

Но возникло следующее исключение, которое я не могу объяснить. Я предположил, что конфигурация ThreadPoolExecutor корректна для обработки 6 задач в любой момент.

Pool Size 6
Pool Size 12
Pool Size 15
Pool Size 16
Pool Size 17
Pool Size 18
Pool Size 19
Pool Size 20
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@2328c243 rejected from java.util.concurrent.ThreadPoolExecutor@bebdb06[Running, pool size = 20, active threads = 0, queued tasks = 0, completed tasks = 53]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:134)

1 ответ

ThreadPoolExecutor.execute есть комментарий, описывающий, как он ведет себя при отправке новой задачи:

    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */

В вашем случае, когда вы отправляете пакеты по 6 задач одновременно, когда текущий размер пула меньше размера ядра, эти представления немедленно отправляются новым рабочим потокам (см. Переходы от 0 до 6 и от 6 до 12).

После того, как вы превысили размер основного пула, но все еще меньше максимального размера, задачи отправляются в очередь, а затем выполняются асинхронно для запуска в существующем рабочем потоке, пока очередь не заполнена. Поскольку все эти задачи отправляются последовательно, существует высокая вероятность того, что все шесть будут отправлены до того, как кто-либо будет удален из очереди; таким образом, первые пять будут поставлены в очередь, а оставшиеся - перейдут к шагу 3 описанного выше процесса: создается новый рабочий поток, и эта задача запускается немедленно. (Это объясняет более поздние скачки с 15 до 16, с 16 до 17 и т. Д.)

В конечном итоге это приводит к тому, что пул потоков имеет максимальное количество рабочих потоков, и когда выполняется шаг 3 вышеуказанного процесса (как в последнем абзаце), Исполнитель не может создать нового работника и отклоняет задачу. По сути, даже несмотря на то, что существуют доступные рабочие потоки, вы не дали исполнителю времени вытянуть задачи из очереди для их выполнения, прежде чем переполнять очередь.

Другие вопросы по тегам