Java Executor с управлением дросселированием / пропускной способностью
Я ищу Java Executor, который позволил бы мне указать ограничения регулирования / пропускной способности / стимуляции, например, не более, чем, скажем, 100 задач могут быть обработаны в секунду - если передается больше задач, они должны быть поставлены в очередь и выполнены позже. Основная цель этого состоит в том, чтобы избежать ограничений при использовании внешних API или серверов.
Мне интересно, обеспечивает ли это базовая Java (в чем я сомневаюсь, потому что я проверил) или где-то еще надежное (например, Apache Commons), или мне приходится писать свою собственную. Желательно что-то легкое. Я не против написать это сам, но если где-то есть "стандартная" версия, я бы по крайней мере хотел бы сначала взглянуть на нее.
5 ответов
Посмотрите на guavas RateLimiter:
Ограничитель скорости. Концептуально, ограничитель скорости распределяет разрешения с настраиваемой скоростью. Каждый acqu () блокируется, если необходимо, до получения разрешения, а затем получает его. После получения разрешения разрешение не требуется. Ограничители скорости часто используются для ограничения скорости доступа к некоторым физическим или логическим ресурсам. Это в отличие от семафора, который ограничивает количество одновременных обращений вместо скорости (обратите внимание, что параллелизм и скорость тесно связаны, например, см. Закон Литтла).
Это потокобезопасное, но все же @Beta
, В любом случае, стоит попробовать.
Вы должны будете обернуть каждый звонок в Executor
по отношению к ограничителю скорости. Для более чистого решения вы можете создать какую-то обертку для ExecutorService
,
Из Javadoc:
final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
void submitTasks(List<Runnable> tasks, Executor executor) {
for (Runnable task : tasks) {
rateLimiter.acquire(); // may wait
executor.execute(task);
}
}
Java Executor не предлагает такого ограничения, только ограничение по количеству потоков, а это не то, что вы ищете.
В общем, Исполнитель - это неправильное место для ограничения таких действий в любом случае, это должно быть в тот момент, когда Поток пытается вызвать внешний сервер. Например, вы можете сделать это с помощью ограничивающего семафора, в котором потоки ждут, прежде чем отправят свои запросы.
Вызывающая тема:
public void run() {
// ...
requestLimiter.acquire();
connection.send();
// ...
}
В то же время вы планируете (один) вторичный поток периодически (как каждые 60 секунд) освобождать полученные ресурсы:
public void run() {
// ...
requestLimiter.drainPermits(); // make sure not more than max are released by draining the Semaphore empty
requestLimiter.release(MAX_NUM_REQUESTS);
// ...
}
не более, чем, скажем, 100 задач могут быть обработаны в секунду - если больше задач отправлено, они должны быть поставлены в очередь и выполнены позже
Вы должны посмотреть в Executors.newFixedThreadPool(int limit)
, Это позволит вам ограничить количество потоков, которые могут выполняться одновременно. Если вы отправите более одного потока, они будут поставлены в очередь и выполнены позже.
ExecutorService threadPool = Executors.newFixedThreadPool(100);
Future<?> result1 = threadPool.submit(runnable1);
Future<?> result2 = threadPool.submit(runnable2);
Futurte<SomeClass> result3 = threadPool.submit(callable1);
...
Фрагмент выше показывает, как вы будете работать с ExecutorService
это позволяет одновременно выполнять не более 100 потоков.
Обновить:
После просмотра комментариев, вот что я придумала (довольно глупо). Как насчет ручного отслеживания потоков, которые должны быть выполнены? Как насчет хранения их сначала в ArrayList
а затем отправив их в Executor
основанный на том, сколько потоков уже было выполнено за последнюю секунду.
Итак, скажем, 200 задач были переданы в наш ведущий ArrayList
Мы можем повторить и добавить 100 к Executor
, Когда проходит секунда, мы можем добавить еще несколько потоков в зависимости от того, сколько завершено в Executor
и так далее
В зависимости от сценария и, как предлагается в одном из предыдущих ответов, основные функции ThreadPoolExecutor могут помочь.
Но если пул потоков используется несколькими клиентами, и вы хотите ограничить использование каждого из них, чтобы убедиться, что один клиент не будет использовать все потоки, тогда BoundedExecutor выполнит эту работу.
Более подробную информацию можно найти в следующем примере:
Лично я нашел этот сценарий довольно интересным. В моем случае я хотел подчеркнуть, что интересная фаза для дросселирования является побочной стороной, как в классической параллельной теории производитель / потребитель. Это противоположно некоторым из предложенных ответов ранее. Это значит, что мы не хотим блокировать отправляющий поток, а блокируем потоки-потребители на основе политики скорости (задач / секунду). Таким образом, даже если в очереди есть готовые задачи, выполняющиеся / потребляющие потоки могут блокировать ожидание выполнения политики регулирования.
Тем не менее, я думаю, что хорошим кандидатом был бы Executors.newScheduledThreadPool(int corePoolSize). Таким образом, вам потребуется простая очередь перед исполнителем (подойдет простая LinkedBlockingQueue), а затем запланировать периодическое задание для выбора актуальных задач из очереди (ScheduledExecutorService.scheduleAtFixedRate). Таким образом, это не простое решение, но оно должно работать достаточно хорошо, если вы пытаетесь задушить потребителей, как обсуждалось ранее.
Можно ограничить его внутри Runnable:
public static Runnable throttle (Runnable realRunner, long delay) {
Runnable throttleRunner = new Runnable() {
// whether is waiting to run
private boolean _isWaiting = false;
// target time to run realRunner
private long _timeToRun;
// specified delay time to wait
private long _delay = delay;
// Runnable that has the real task to run
private Runnable _realRunner = realRunner;
@Override
public void run() {
// current time
long now;
synchronized (this) {
// another thread is waiting, skip
if (_isWaiting) return;
now = System.currentTimeMillis();
// update time to run
// do not update it each time since
// you do not want to postpone it unlimited
_timeToRun = now+_delay;
// set waiting status
_isWaiting = true;
}
try {
Thread.sleep(_timeToRun-now);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// clear waiting status before run
_isWaiting = false;
// do the real task
_realRunner.run();
}
}};
return throttleRunner;
}
Взять из JAVA Thread Debounce и Throttle