Динамическое изменение размера java.util.concurrent.ThreadPoolExecutor, в то время как у него есть ожидающие задачи
Я работаю с java.util.concurrent.ThreadPoolExecutor
обрабатывать несколько элементов параллельно. Несмотря на то, что сам процесс потоков работает нормально, иногда мы сталкиваемся с другими ограничениями ресурсов из-за действий, происходящих в потоках, из-за которых нам хотелось набрать количество потоков в пуле.
Я хотел бы знать, есть ли способ набрать номер потока, в то время как потоки фактически работают. Я знаю, что вы можете позвонить setMaximumPoolSize()
и / или setCorePoolSize()
, но они изменяют размер пула только после того, как потоки становятся бездействующими, но они не становятся бездействующими, пока в очереди не ожидают никаких задач.
5 ответов
Насколько я могу судить, это невозможно сделать в чистом виде.
Вы можете реализовать метод beforeExecute, чтобы проверить некоторое логическое значение и принудительно остановить потоки. Имейте в виду, что они будут содержать задачу, которая не будет выполнена, пока они не будут повторно включены.
В качестве альтернативы вы можете реализовать afterExecute, чтобы генерировать RuntimeException, когда вы насыщены. Это фактически приведет к смерти потока, и, поскольку исполнитель будет выше максимума, новый не будет создан.
Я не рекомендую вам делать тоже. Вместо этого попробуйте найти другой способ управления одновременным выполнением задач, которые вызывают у вас проблемы. Возможно, выполняя их в отдельном пуле потоков с более ограниченным числом работников.
Вы абсолютно можете. призвание setCorePoolSize(int)
изменит размер ядра пула. Вызовы этого метода являются поточно-ориентированными и переопределяют настройки, предоставляемые конструктору ThreadPoolExecutor
, Если вы урезаете размер пула, остальные потоки будут закрыты, как только их текущая очередь заданий будет завершена (если они простаивают, они будут немедленно закрыты). Если вы увеличиваете размер пула, новые потоки будут распределены как можно скорее. Сроки для выделения новых потоков не документированы, но в реализации распределение новых потоков выполняется при каждом обращении к execute
метод.
Чтобы связать это с настраиваемой фермой заданий во время выполнения, вы можете предоставить это свойство (либо с помощью оболочки, либо с помощью динамического экспортера MBean) в качестве атрибута JMX для чтения и записи, чтобы создать довольно приятный, настраиваемый пакетный процессор на лету.
Чтобы принудительно уменьшить размер пула во время выполнения (что является вашим запросом), вы должны создать подкласс ThreadPoolExecutor
и добавить нарушение к beforeExecute(Thread,Runnable)
метод. Прерывание потока не является достаточным нарушением, поскольку оно взаимодействует только с состояниями ожидания и во время обработки ThreadPoolExecutor
потоки задач не переходят в прерываемое состояние.
Недавно я столкнулся с той же проблемой, пытаясь принудительно завершить пул потоков перед выполнением всех отправленных задач. Чтобы это произошло, я прервал поток, выдав исключение времени выполнения только после замены UncaughtExceptionHandler
потока с тем, который ожидает моего конкретного исключения и отбрасывает его.
/**
* A runtime exception used to prematurely terminate threads in this pool.
*/
static class ShutdownException
extends RuntimeException {
ShutdownException (String message) {
super(message);
}
}
/**
* This uncaught exception handler is used only as threads are entered into
* their shutdown state.
*/
static class ShutdownHandler
implements UncaughtExceptionHandler {
private UncaughtExceptionHandler handler;
/**
* Create a new shutdown handler.
*
* @param handler The original handler to deligate non-shutdown
* exceptions to.
*/
ShutdownHandler (UncaughtExceptionHandler handler) {
this.handler = handler;
}
/**
* Quietly ignore {@link ShutdownException}.
* <p>
* Do nothing if this is a ShutdownException, this is just to prevent
* logging an uncaught exception which is expected. Otherwise forward
* it to the thread group handler (which may hand it off to the default
* uncaught exception handler).
* </p>
*/
public void uncaughtException (Thread thread, Throwable throwable) {
if (!(throwable instanceof ShutdownException)) {
/* Use the original exception handler if one is available,
* otherwise use the group exception handler.
*/
if (handler != null) {
handler.uncaughtException(thread, throwable);
}
}
}
}
/**
* Configure the given job as a spring bean.
*
* <p>Given a runnable task, configure it as a prototype spring bean,
* injecting any necessary dependencices.</p>
*
* @param thread The thread the task will be executed in.
* @param job The job to configure.
*
* @throws IllegalStateException if any error occurs.
*/
protected void beforeExecute (final Thread thread, final Runnable job) {
/* If we're in shutdown, it's because spring is in singleton shutdown
* mode. This means we must not attempt to configure the bean, but
* rather we must exit immediately (prematurely, even).
*/
if (!this.isShutdown()) {
if (factory == null) {
throw new IllegalStateException(
"This class must be instantiated by spring"
);
}
factory.configureBean(job, job.getClass().getName());
}
else {
/* If we are in shutdown mode, replace the job on the queue so the
* next process will see it and it won't get dropped. Further,
* interrupt this thread so it will no longer process jobs. This
* deviates from the existing behavior of shutdown().
*/
workQueue.add(job);
thread.setUncaughtExceptionHandler(
new ShutdownHandler(thread.getUncaughtExceptionHandler())
);
/* Throwing a runtime exception is the only way to prematurely
* cause a worker thread from the TheadPoolExecutor to exit.
*/
throw new ShutdownException("Terminating thread");
}
}
В вашем случае вы можете захотеть создать семафор (просто для использования в качестве счетчика потока безопасности), который не имеет разрешений, а при закрытии потоков высвобождает ему количество разрешений, соответствующее дельте предыдущего размера основного пула и новый размер пула (требующий переопределения setCorePoolSize(int)
метод). Это позволит вам завершить ваши потоки после завершения их текущей задачи.
private Semaphore terminations = new Semaphore(0);
protected void beforeExecute (final Thread thread, final Runnable job) {
if (terminations.tryAcquire()) {
/* Replace this item in the queue so it may be executed by another
* thread
*/
queue.add(job);
thread.setUncaughtExceptionHandler(
new ShutdownHandler(thread.getUncaughtExceptionHandler())
);
/* Throwing a runtime exception is the only way to prematurely
* cause a worker thread from the TheadPoolExecutor to exit.
*/
throw new ShutdownException("Terminating thread");
}
}
public void setCorePoolSize (final int size) {
int delta = getActiveCount() - size;
super.setCorePoolSize(size);
if (delta > 0) {
terminations.release(delta);
}
}
Это должно прервать n потоков для f(n) = active - запрошено. Если есть какие-либо проблемы, ThreadPoolExecutor
Стратегия распределения достаточно долговечна. Он ведет учет преждевременного прекращения, используя finally
блок, который гарантирует исполнение. По этой причине, даже если вы прервете слишком много потоков, они будут переполнены.
Решение состоит в том, чтобы опустошить очередь ThreadPoolExecutor, установить необходимый размер ThreadPoolExecutor и затем добавить потоки по очереди, как только остальные закончатся. Метод для слива очереди в классе ThreadPoolExecutor является закрытым, поэтому вы должны создать его самостоятельно. Вот код:
/**
* Drains the task queue into a new list. Used by shutdownNow.
* Call only while holding main lock.
*/
public static List<Runnable> drainQueue() {
List<Runnable> taskList = new ArrayList<Runnable>();
BlockingQueue<Runnable> workQueue = executor.getQueue();
workQueue.drainTo(taskList);
/*
* If the queue is a DelayQueue or any other kind of queue
* for which poll or drainTo may fail to remove some elements,
* we need to manually traverse and remove remaining tasks.
* To guarantee atomicity wrt other threads using this queue,
* we need to create a new iterator for each element removed.
*/
while (!workQueue.isEmpty()) {
Iterator<Runnable> it = workQueue.iterator();
try {
if (it.hasNext()) {
Runnable r = it.next();
if (workQueue.remove(r))
taskList.add(r);
}
} catch (ConcurrentModificationException ignore) {
}
}
return taskList;
}
Перед вызовом этого метода вам нужно получить, а затем снять главную блокировку. Для этого вам нужно использовать Java-отражение, потому что поле "mainLock" является приватным. Опять же, вот код:
private Field getMainLock() throws NoSuchFieldException {
Field mainLock = executor.getClass().getDeclaredField("mainLock");
mainLock.setAccessible(true);
return mainLock;
}
Где "executor" - это ваш ThreadPoolExecutor.
Теперь вам нужны методы блокировки / разблокировки:
public void lock() {
try {
Field mainLock = getMainLock();
Method lock = mainLock.getType().getDeclaredMethod("lock", (Class[])null);
lock.invoke(mainLock.get(executor), (Object[])null);
} catch {
...
}
}
public void unlock() {
try {
Field mainLock = getMainLock();
mainLock.setAccessible(true);
Method lock = mainLock.getType().getDeclaredMethod("unlock", (Class[])null);
lock.invoke(mainLock.get(executor), (Object[])null);
} catch {
...
}
}
Наконец, вы можете написать свой метод "setThreadsNumber", и он будет работать как с увеличением, так и уменьшением размера ThreadPoolExecutor:
public void setThreadsNumber(int intValue) {
boolean increasing = intValue > executor.getPoolSize();
executor.setCorePoolSize(intValue);
executor.setMaximumPoolSize(intValue);
if(increasing){
if(drainedQueue != null && (drainedQueue.size() > 0)){
executor.submit(drainedQueue.remove(0));
}
} else {
if(drainedQueue == null){
lock();
drainedQueue = drainQueue();
unlock();
}
}
}
Примечание: очевидно, что если вы выполните N параллельных потоков и измените это число на N-1, все N потоков будут продолжать работать. Когда первый поток заканчивается, новые потоки выполняться не будут. Отныне номер параллельного потока будет тот, который вы выбрали.
Мне тоже нужно было то же решение, и кажется, что в JDK8 setCorePoolSize() и setMaximumPoolSize() действительно дают желаемый результат. Я сделал тестовый пример, в котором я отправляю 4 пула в пул, и они выполняются одновременно, я уменьшаю размер пула во время их работы и отправляю еще один исполняемый файл, который я хочу быть одиноким. Затем я возвращаю бассейн к его первоначальному размеру. Вот тестовый источник https://gist.github.com/southerton81/96e141b8feede3fe0b8f88f679bef381
Он производит следующий вывод (поток "50" - это тот, который должен выполняться изолированно)
run:
test thread 2 enter
test thread 1 enter
test thread 3 enter
test thread 4 enter
test thread 1 exit
test thread 2 exit
test thread 3 exit
test thread 4 exit
test thread 50 enter
test thread 50 exit
test thread 1 enter
test thread 2 enter
test thread 3 enter
test thread 4 enter
test thread 1 exit
test thread 2 exit
test thread 3 exit
test thread 4 exit
Я прочитал документацию по setMaximumPoolSize() и setCorePoolSize(), и кажется, что они могут произвести нужное вам поведение.
-- РЕДАКТИРОВАТЬ --
Мой вывод был неверным: пожалуйста, смотрите обсуждение ниже для деталей...