Блок ThreadPoolExecutor, когда очередь заполнена?
Я пытаюсь выполнить много задач, используя ThreadPoolExecutor. Ниже приведен гипотетический пример:
def workQueue = new ArrayBlockingQueue<Runnable>(3, false)
def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue)
for(int i = 0; i < 100000; i++)
threadPoolExecutor.execute(runnable)
Проблема в том, что я быстро получаю исключение java.util.concurrent.RejectedExecutionException, так как число задач превышает размер рабочей очереди. Однако желаемое поведение, которое я ищу, - это иметь блок основного потока, пока в очереди не будет места. Каков наилучший способ сделать это?
10 ответов
В некоторых очень узких обстоятельствах вы можете реализовать java.util.concurrent.RejectedExecutionHandler, который делает то, что вам нужно.
RejectedExecutionHandler block = new RejectedExecutionHandler() {
rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
executor.getQueue().put( r );
}
};
ThreadPoolExecutor pool = new ...
pool.setRejectedExecutionHandler(block);
Сейчас. Это очень плохая идея по следующим причинам
- Он склонен к взаимоблокировке, поскольку все потоки в пуле могут умереть до того, как вещь, которую вы положили в очередь, станет видимой. Смягчить это, установив разумное время поддержки.
- Задача не упакована так, как может ожидать ваш исполнитель. Многие реализации executor оборачивают свои задачи в какой-то объект отслеживания перед выполнением. Посмотрите на свой источник.
- Добавление через getQueue() настоятельно не рекомендуется API и может быть запрещено в какой-то момент.
Практически всегда лучшая стратегия заключается в установке ThreadPoolExecutor.CallerRunsPolicy, который будет ограничивать ваше приложение, выполняя задачу в потоке, который вызывает execute().
Однако иногда стратегия блокирования со всеми присущими ей рисками действительно является тем, что вы хотите. Я бы сказал, в этих условиях
- У вас есть только один поток, вызывающий execute ()
- Вы должны (или хотите) иметь очень маленькую длину очереди
- Вам абсолютно необходимо ограничить количество потоков, выполняющих эту работу (обычно по внешним причинам), и стратегия запуска вызывающих программ нарушит это.
- Ваши задачи имеют непредсказуемый размер, поэтому вызовы могут вызвать голодание, если пул на мгновение был занят 4 короткими задачами, а ваш поток, вызывающий execute, застрял с большим.
Итак, как я сказал. Это редко нужно и может быть опасно, но вот так.
Удачи.
Что вам нужно сделать, это обернуть ваш ThreadPoolExecutor в Executor, который явно ограничивает количество одновременно выполняемых операций внутри него:
private static class BlockingExecutor implements Executor {
final Semaphore semaphore;
final Executor delegate;
private BlockingExecutor(final int concurrentTasksLimit, final Executor delegate) {
semaphore = new Semaphore(concurrentTasksLimit);
this.delegate = delegate;
}
@Override
public void execute(final Runnable command) {
try {
semaphore.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
return;
}
final Runnable wrapped = () -> {
try {
command.run();
} finally {
semaphore.release();
}
};
delegate.execute(wrapped);
}
}
Вы можете настроить concurrentTasksLimit на threadPoolSize + queueSize вашего исполнителя-делегата, и это в значительной степени решит вашу проблему
Вы могли бы использовать semaphore
заблокировать потоки от входа в бассейн.
ExecutorService service = new ThreadPoolExecutor(
3,
3,
1,
TimeUnit.HOURS,
new ArrayBlockingQueue<>(6, false)
);
Semaphore lock = new Semaphore(6); // equal to queue capacity
for (int i = 0; i < 100000; i++ ) {
try {
lock.acquire();
service.submit(() -> {
try {
task.run();
} finally {
lock.release();
}
});
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
Некоторые ошибки:
- Используйте этот шаблон только с фиксированным пулом потоков. Очередь вряд ли будет часто заполняться, поэтому новые потоки создаваться не будут. Ознакомьтесь с документами по Java на ThreadPoolExecutor для получения более подробной информации: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadPoolExecutor.html Есть способ обойти это, но Объем этого ответа.
Размер очереди должен быть больше, чем количество основных потоков. Если бы мы сделали размер очереди 3, в конечном итоге произошло бы следующее:
- T0: все три потока работают, очередь пуста, разрешения отсутствуют.
- T1: поток 1 заканчивается, освобождает разрешение.
- T2: Поток 1 опрашивает очередь на предмет новой работы, не находит ее и ждет.
- T3: основной поток отправляет работу в пул, поток 1 начинает работу.
Вышеприведенный пример переводит в поток основной поток, блокирующий поток 1. Это может показаться небольшим периодом, но теперь умножьте частоту на дни и месяцы. Внезапно, короткие периоды времени складываются в большую потерю времени.
Это то, что я в итоге сделал:
int NUM_THREADS = 6;
Semaphore lock = new Semaphore(NUM_THREADS);
ExecutorService pool = Executors.newCachedThreadPool();
for (int i = 0; i < 100000; i++) {
try {
lock.acquire();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
pool.execute(() -> {
try {
// Task logic
} finally {
lock.release();
}
});
}
Достаточно простой вариант - обернуть BlockingQueue
с реализацией, которая вызывает put(..)
когда offer(..)
вызывается:
public class BlockOnOfferAdapter<T> implements BlockingQueue<T> {
(..)
public boolean offer(E o) {
try {
delegate.put(o);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
return true;
}
(.. implement all other methods simply by delegating ..)
}
Это работает, потому что по умолчанию put(..)
ожидает, пока не заполнится очередь, см.:
/**
* Inserts the specified element into this queue, waiting if necessary
* for space to become available.
*
* @param e the element to add
* @throws InterruptedException if interrupted while waiting
* @throws ClassCastException if the class of the specified element
* prevents it from being added to this queue
* @throws NullPointerException if the specified element is null
* @throws IllegalArgumentException if some property of the specified
* element prevents it from being added to this queue
*/
void put(E e)throws InterruptedException;
Нет ловли RejectedExecutionException
или требуется сложная блокировка.
Вот мой фрагмент кода в этом случае:
public void executeBlocking( Runnable command ) {
if ( threadPool == null ) {
logger.error( "Thread pool '{}' not initialized.", threadPoolName );
return;
}
ThreadPool threadPoolMonitor = this;
boolean accepted = false;
do {
try {
threadPool.execute( new Runnable() {
@Override
public void run() {
try {
command.run();
}
// to make sure that the monitor is freed on exit
finally {
// Notify all the threads waiting for the resource, if any.
synchronized ( threadPoolMonitor ) {
threadPoolMonitor.notifyAll();
}
}
}
} );
accepted = true;
}
catch ( RejectedExecutionException e ) {
// Thread pool is full
try {
// Block until one of the threads finishes its job and exits.
synchronized ( threadPoolMonitor ) {
threadPoolMonitor.wait();
}
}
catch ( InterruptedException ignored ) {
// return immediately
break;
}
}
} while ( !accepted );
}
threadPool - это локальный экземпляр java.util.concurrent.ExecutorService, который уже был инициализирован.
Хорошо, старый поток, но это то, что я нашел, когда искал блокирующего исполнителя потока. Мой код пытается получить семафор, когда задача отправляется в очередь задач. Это блокирует, если не осталось семафоров. Как только задача выполнена, семафор освобождается вместе с декоратором. Страшно то, что существует вероятность потери семафоров, но это можно решить, например, с помощью задания по времени, которое просто очищает семафоры по времени.
Итак, вот мое решение:
class BlockingThreadPoolTaskExecutor(concurrency: Int) : ThreadPoolTaskExecutor() {
companion object {
lateinit var semaphore: Semaphore
}
init {
semaphore = Semaphore(concurrency)
val semaphoreTaskDecorator = SemaphoreTaskDecorator()
this.setTaskDecorator(semaphoreTaskDecorator)
}
override fun <T> submit(task: Callable<T>): Future<T> {
log.debug("submit")
semaphore.acquire()
return super.submit(task)
}
}
private class SemaphoreTaskDecorator : TaskDecorator {
override fun decorate(runnable: Runnable): Runnable {
log.debug("decorate")
return Runnable {
try {
runnable.run()
} finally {
log.debug("decorate done")
semaphore.release()
}
}
}
}
Можно использовать LinkedBlockingQueue с ThreadPoolExecutor:
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
keepAliveTime, unit, new LinkedBlockingQueue<Runnable>(YOUR_QUEUE_SIZE));
Не беспокойтесь о вызове ThreadPoolExecutor
add(e)
или вместо ; ThreadPoolExecutor знает, когда использовать тот или иной. ThreadPoolExecutor предполагает, что имеет дело с BlockingQueue , не более и не менее, поэтому он не будет вызывать, например
offer(e)
вместо
put(e)
потому что это означает полагаться на тот факт, что предоставленный
BlockingQueue
неограничен, что может быть не так.
Я решил эту проблему с помощью пользовательского RejectedExecutionHandler, который на некоторое время просто блокирует вызывающий поток, а затем снова пытается отправить задачу:
public class BlockWhenQueueFull implements RejectedExecutionHandler {
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
// The pool is full. Wait, then try again.
try {
long waitMs = 250;
Thread.sleep(waitMs);
} catch (InterruptedException interruptedException) {}
executor.execute(r);
}
}
Этот класс можно просто использовать в исполнителе пула потоков как RejectedExecutionHandler, как и любой другой. В этом примере:
executorPool = new def threadPoolExecutor = new ThreadPoolExecutor(3, 3, 1L, TimeUnit.HOURS, workQueue, new BlockWhenQueueFull())
Единственным недостатком, который я вижу, является то, что вызывающий поток может быть заблокирован немного дольше, чем это строго необходимо (до 250 мс). Для многих краткосрочных задач, возможно, уменьшите время ожидания до 10 мс или около того. Более того, поскольку этот исполнитель фактически вызывается рекурсивно, очень долгое ожидание появления потока (часы) может привести к переполнению стека.
Тем не менее, мне лично нравится этот метод. Он компактен, прост для понимания и хорошо работает. Я что-то упустил?
Вы можете реализовать RejectedTaskHandler и получить все отклоненные задачи, когда размер очереди заполнен. По умолчанию у исполнителей есть политика отмены, поэтому вы можете добавить эту задачу обратно в очередь из обработчика или любого другого выбора.
public class ExecutorRejectedTaskHandlerFixedThreadPool {
public static void main(String[] args) throws InterruptedException {
//maximum queue size : 2
BlockingQueue<Runnable> blockingQueue =
new LinkedBlockingQueue<Runnable>(2);
CustomThreadPoolExecutor executor =
new CustomThreadPoolExecutor(4, 5, 5, TimeUnit.SECONDS,
blockingQueue);
RejectedTaskHandler rejectedHandler = new RejectedTaskHandler();
executor.setRejectedExecutionHandler(rejectedHandler);
//submit 20 the tasks for execution
//Note: only 7 tasks(5-max pool size + 2-queue size) will be executed and rest will be rejected as queue will be overflowed
for (int i = 0; i < 20; i++) {
executor.execute(new Task());
}
System.out.println("Thread name " + Thread.currentThread().getName());
}
static class Task implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Thread - " + Thread.currentThread().getName() + " performing it's job");
}
}
static class RejectedTaskHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("Task rejected" + r.toString());
}
}
public static class CustomThreadPoolExecutor extends ThreadPoolExecutor {
public CustomThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
}
@Override
protected void afterExecute(Runnable r, Throwable t) {
super.afterExecute(r, t);
}
}
}