Java ThreadPoolExecutor зависает при использовании ArrayBlockingQueue
Я работаю над некоторым приложением и использую ThreadPoolExecutor для обработки различных задач. ThreadPoolExecutor застревает через некоторое время. Чтобы смоделировать это в более простой среде, я написал простой код, в котором я могу смоделировать проблему.
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
public class MyThreadPoolExecutor {
private int poolSize = 10;
private int maxPoolSize = 50;
private long keepAliveTime = 10;
private ThreadPoolExecutor threadPool = null;
private final ArrayBlockingQueue<Runnable> queue = new ArrayBlockingQueue<Runnable>(
100000);
public MyThreadPoolExecutor() {
threadPool = new ThreadPoolExecutor(poolSize, maxPoolSize,
keepAliveTime, TimeUnit.SECONDS, queue);
threadPool.setRejectedExecutionHandler(new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor threadPoolExecutor) {
System.out
.println("Execution rejected. Please try restarting the application.");
}
});
}
public void runTask(Runnable task) {
threadPool.execute(task);
}
public void shutDown() {
threadPool.shutdownNow();
}
public ThreadPoolExecutor getThreadPool() {
return threadPool;
}
public void setThreadPool(ThreadPoolExecutor threadPool) {
this.threadPool = threadPool;
}
public static void main(String[] args) {
MyThreadPoolExecutor mtpe = new MyThreadPoolExecutor();
for (int i = 0; i < 1000; i++) {
final int j = i;
mtpe.runTask(new Runnable() {
@Override
public void run() {
System.out.println(j);
}
});
}
}
}
Попробуйте выполнить этот код несколько раз. Обычно он выводит номер на консоль, и когда все потоки заканчиваются, он существует. Но иногда он выполнил все задачи, а затем не завершается. Дамп потока выглядит следующим образом:
MyThreadPoolExecutor [Java Application]
MyThreadPoolExecutor at localhost:2619 (Suspended)
Daemon System Thread [Attach Listener] (Suspended)
Daemon System Thread [Signal Dispatcher] (Suspended)
Daemon System Thread [Finalizer] (Suspended)
Object.wait(long) line: not available [native method]
ReferenceQueue<T>.remove(long) line: not available
ReferenceQueue<T>.remove() line: not available
Finalizer$FinalizerThread.run() line: not available
Daemon System Thread [Reference Handler] (Suspended)
Object.wait(long) line: not available [native method]
Reference$Lock(Object).wait() line: 485
Reference$ReferenceHandler.run() line: not available
Thread [pool-1-thread-1] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-2] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-3] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-4] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-6] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-8] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-5] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-10] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-9] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [pool-1-thread-7] (Suspended)
Unsafe.park(boolean, long) line: not available [native method]
LockSupport.park(Object) line: not available
AbstractQueuedSynchronizer$ConditionObject.await() line: not available
ArrayBlockingQueue<E>.take() line: not available
ThreadPoolExecutor.getTask() line: not available
ThreadPoolExecutor$Worker.run() line: not available
Thread.run() line: not available
Thread [DestroyJavaVM] (Suspended)
C:\Program Files\Java\jre1.6.0_07\bin\javaw.exe (Jun 17, 2010 10:42:33 AM)
В моем реальном приложении потоки ThreadPoolExecutor переходят в это состояние и перестают отвечать.
С уважением, Рави Рао
1 ответ
В вашем main
метод, вы никогда не звоните mtpe.shutdown()
, ThreadPoolExecutor будет пытаться сохранить его corePoolSize
живы до бесконечности. Иногда вам везет, и у вас больше, чем corePoolSize
потоки живы, поэтому каждый рабочий поток перейдет в ветвь условной логики, которая позволит ему завершиться через заданный период ожидания 10 секунд. Однако, как вы заметили, иногда это не так, поэтому каждый поток в исполнителе блокирует ArrayBlockingQueue.take() и ожидает новую задачу.
Также обратите внимание, что существует существенная разница между ExecutorService.shutdown() и ExecutorService.shutdownNow(). Если вы вызовете ExecutorService.shutdownNow(), как указывает ваша реализация оболочки, вы будете иногда отбрасывать некоторые задачи, которые не были назначены для выполнения.
Обновление: так как мой оригинальный ответ, ThreadPoolExecutor
реализация изменилась так, что программа в исходном посте никогда не должна выходить.