Java Executor с управлением дросселированием / пропускной способностью

Я ищу Java Executor, который позволил бы мне указать ограничения регулирования / пропускной способности / стимуляции, например, не более, чем, скажем, 100 задач могут быть обработаны в секунду - если передается больше задач, они должны быть поставлены в очередь и выполнены позже. Основная цель этого состоит в том, чтобы избежать ограничений при использовании внешних API или серверов.

Мне интересно, обеспечивает ли это базовая Java (в чем я сомневаюсь, потому что я проверил) или где-то еще надежное (например, Apache Commons), или мне приходится писать свою собственную. Желательно что-то легкое. Я не против написать это сам, но если где-то есть "стандартная" версия, я бы по крайней мере хотел бы сначала взглянуть на нее.

5 ответов

Решение

Посмотрите на guavas RateLimiter:

Ограничитель скорости. Концептуально, ограничитель скорости распределяет разрешения с настраиваемой скоростью. Каждый acqu () блокируется, если необходимо, до получения разрешения, а затем получает его. После получения разрешения разрешение не требуется. Ограничители скорости часто используются для ограничения скорости доступа к некоторым физическим или логическим ресурсам. Это в отличие от семафора, который ограничивает количество одновременных обращений вместо скорости (обратите внимание, что параллелизм и скорость тесно связаны, например, см. Закон Литтла).

Это потокобезопасное, но все же @Beta, В любом случае, стоит попробовать.

Вы должны будете обернуть каждый звонок в Executor по отношению к ограничителю скорости. Для более чистого решения вы можете создать какую-то обертку для ExecutorService,

Из Javadoc:

 final RateLimiter rateLimiter = RateLimiter.create(2.0); // rate is "2 permits per second"
  void submitTasks(List<Runnable> tasks, Executor executor) {
    for (Runnable task : tasks) {
      rateLimiter.acquire(); // may wait
      executor.execute(task);
    }
  }

Java Executor не предлагает такого ограничения, только ограничение по количеству потоков, а это не то, что вы ищете.

В общем, Исполнитель - это неправильное место для ограничения таких действий в любом случае, это должно быть в тот момент, когда Поток пытается вызвать внешний сервер. Например, вы можете сделать это с помощью ограничивающего семафора, в котором потоки ждут, прежде чем отправят свои запросы.

Вызывающая тема:

public void run() {
  // ...
  requestLimiter.acquire();
  connection.send();
  // ...
 }

В то же время вы планируете (один) вторичный поток периодически (как каждые 60 секунд) освобождать полученные ресурсы:

 public void run() {
  // ...
  requestLimiter.drainPermits();  // make sure not more than max are released by draining the Semaphore empty
  requestLimiter.release(MAX_NUM_REQUESTS);
  // ...
 }

не более, чем, скажем, 100 задач могут быть обработаны в секунду - если больше задач отправлено, они должны быть поставлены в очередь и выполнены позже

Вы должны посмотреть в Executors.newFixedThreadPool(int limit), Это позволит вам ограничить количество потоков, которые могут выполняться одновременно. Если вы отправите более одного потока, они будут поставлены в очередь и выполнены позже.

ExecutorService threadPool = Executors.newFixedThreadPool(100);
Future<?> result1 =  threadPool.submit(runnable1);
Future<?> result2 = threadPool.submit(runnable2);
Futurte<SomeClass> result3 = threadPool.submit(callable1);  
...  

Фрагмент выше показывает, как вы будете работать с ExecutorService это позволяет одновременно выполнять не более 100 потоков.

Обновить:
После просмотра комментариев, вот что я придумала (довольно глупо). Как насчет ручного отслеживания потоков, которые должны быть выполнены? Как насчет хранения их сначала в ArrayList а затем отправив их в Executor основанный на том, сколько потоков уже было выполнено за последнюю секунду.
Итак, скажем, 200 задач были переданы в наш ведущий ArrayList Мы можем повторить и добавить 100 к Executor , Когда проходит секунда, мы можем добавить еще несколько потоков в зависимости от того, сколько завершено в Executor и так далее

В зависимости от сценария и, как предлагается в одном из предыдущих ответов, основные функции ThreadPoolExecutor могут помочь.

Но если пул потоков используется несколькими клиентами, и вы хотите ограничить использование каждого из них, чтобы убедиться, что один клиент не будет использовать все потоки, тогда BoundedExecutor выполнит эту работу.

Более подробную информацию можно найти в следующем примере:

http://jcip.net/listings/BoundedExecutor.java

Лично я нашел этот сценарий довольно интересным. В моем случае я хотел подчеркнуть, что интересная фаза для дросселирования является побочной стороной, как в классической параллельной теории производитель / потребитель. Это противоположно некоторым из предложенных ответов ранее. Это значит, что мы не хотим блокировать отправляющий поток, а блокируем потоки-потребители на основе политики скорости (задач / секунду). Таким образом, даже если в очереди есть готовые задачи, выполняющиеся / потребляющие потоки могут блокировать ожидание выполнения политики регулирования.

Тем не менее, я думаю, что хорошим кандидатом был бы Executors.newScheduledThreadPool(int corePoolSize). Таким образом, вам потребуется простая очередь перед исполнителем (подойдет простая LinkedBlockingQueue), а затем запланировать периодическое задание для выбора актуальных задач из очереди (ScheduledExecutorService.scheduleAtFixedRate). Таким образом, это не простое решение, но оно должно работать достаточно хорошо, если вы пытаетесь задушить потребителей, как обсуждалось ранее.

Можно ограничить его внутри Runnable:

public static Runnable throttle (Runnable realRunner, long delay) {
    Runnable throttleRunner = new Runnable() {
        // whether is waiting to run
        private boolean _isWaiting = false;
        // target time to run realRunner
        private long _timeToRun;
        // specified delay time to wait
        private long _delay = delay;
        // Runnable that has the real task to run
        private Runnable _realRunner = realRunner;
        @Override
        public void run() {
            // current time
            long now;
            synchronized (this) {
                // another thread is waiting, skip
                if (_isWaiting) return;
                now = System.currentTimeMillis();
                // update time to run
                // do not update it each time since
                // you do not want to postpone it unlimited
                _timeToRun = now+_delay;
                // set waiting status
                _isWaiting = true;
            }
            try {
                Thread.sleep(_timeToRun-now);

            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                // clear waiting status before run
                _isWaiting = false;
                // do the real task
                _realRunner.run();
            }
        }};
    return throttleRunner;
}

Взять из JAVA Thread Debounce и Throttle

Другие вопросы по тегам