Вызов метода регулирования на M запросов за N секунд
Мне нужен компонент / класс, который ограничивает выполнение какого-либо метода до максимума M вызовов в N секунд (или мс или нанос, не имеет значения).
Другими словами, мне нужно убедиться, что мой метод выполняется не более M раз в скользящем окне из N секунд.
Если вы не знаете существующий класс, не стесняйтесь публиковать свои решения / идеи, как бы вы это реализовали.
19 ответов
Я бы использовал кольцевой буфер временных меток с фиксированным размером M. Каждый раз, когда вызывается метод, вы проверяете самую старую запись, и если она меньше N секунд в прошлом, вы выполняете и добавляете другую запись, иначе вы спите для разницы во времени.
То, что работало из коробки для меня, было Google Guava RateLimiter.
// Allow one request per second
private RateLimiter throttle = RateLimiter.create(1.0);
private void someMethod() {
throttle.acquire();
// Do something
}
Конкретно, вы должны быть в состоянии реализовать это с DelayQueue
, Инициализировать очередь с помощью M
Delayed
экземпляры с их задержкой изначально установлены на ноль. Когда приходят запросы к методу, take
токен, который заставляет метод блокироваться до тех пор, пока не будет выполнено требование регулирования. Когда токен был взят, add
новый токен в очередь с задержкой N
,
Читайте об алгоритме Token Bucket. По сути, у вас есть ведро с жетонами в нем. Каждый раз, когда вы выполняете метод, вы берете токен. Если токенов больше нет, вы блокируете их, пока не получите. Между тем, существует какой-то внешний субъект, который пополняет токены с фиксированным интервалом.
Я не знаю библиотеки для этого (или чего-то подобного). Вы можете написать эту логику в своем коде или использовать AspectJ для добавления поведения.
Если вам нужен ограничитель скорости скользящего окна на основе Java, который будет работать в распределенной системе, вы можете взглянуть на проект https://github.com/mokies/ratelimitj.
Конфигурация с поддержкой Redis, ограничивающая количество запросов по IP до 50 в минуту, будет выглядеть следующим образом:
import com.lambdaworks.redis.RedisClient;
import es.moki.ratelimitj.core.LimitRule;
RedisClient client = RedisClient.create("redis://localhost");
Set<LimitRule> rules = Collections.singleton(LimitRule.of(1, TimeUnit.MINUTES, 50)); // 50 request per minute, per key
RedisRateLimit requestRateLimiter = new RedisRateLimit(client, rules);
boolean overLimit = requestRateLimiter.overLimit("ip:127.0.0.2");
См. https://github.com/mokies/ratelimitj/tree/master/ratelimitj-redis дополнительной информации о конфигурации Redis.
Моя реализация ниже может обрабатывать произвольную точность времени запроса, она имеет O(1) временную сложность для каждого запроса, не требует никакого дополнительного буфера, например O(1) пространственной сложности, кроме того, он не требует фонового потока для выпуска токена, вместо этого токены выпускаются в соответствии с временем, прошедшим с момента последнего запроса.
class RateLimiter {
int limit;
double available;
long interval;
long lastTimeStamp;
RateLimiter(int limit, long interval) {
this.limit = limit;
this.interval = interval;
available = 0;
lastTimeStamp = System.currentTimeMillis();
}
synchronized boolean canAdd() {
long now = System.currentTimeMillis();
// more token are released since last request
available += (now-lastTimeStamp)*1.0/interval*limit;
if (available>limit)
available = limit;
if (available<1)
return false;
else {
available--;
lastTimeStamp = now;
return true;
}
}
}
Это зависит от приложения.
Представьте себе случай, когда несколько потоков хотят, чтобы токен выполнял какое-то глобально ограниченное по скорости действие, при котором пакет не был разрешен (т.е. вы хотите ограничить 10 действий в 10 секунд, но вы не хотите, чтобы 10 действий происходили в первую секунду, а затем оставались 9 секунд остановились).
У DelayedQueue есть недостаток: порядок, в котором потоки запрашивают токены, может не соответствовать порядку, в котором они получают свой запрос. Если несколько потоков заблокированы в ожидании токена, неясно, какой из них возьмет следующий доступный токен. С моей точки зрения, вы могли бы даже ожидать вечных потоков.
Одним из решений является минимальный интервал времени между двумя последовательными действиями и выполнение действий в том же порядке, в котором они были запрошены.
Вот реализация:
public class LeakyBucket {
protected float maxRate;
protected long minTime;
//holds time of last action (past or future!)
protected long lastSchedAction = System.currentTimeMillis();
public LeakyBucket(float maxRate) throws Exception {
if(maxRate <= 0.0f) {
throw new Exception("Invalid rate");
}
this.maxRate = maxRate;
this.minTime = (long)(1000.0f / maxRate);
}
public void consume() throws InterruptedException {
long curTime = System.currentTimeMillis();
long timeLeft;
//calculate when can we do the action
synchronized(this) {
timeLeft = lastSchedAction + minTime - curTime;
if(timeLeft > 0) {
lastSchedAction += minTime;
}
else {
lastSchedAction = curTime;
}
}
//If needed, wait for our time
if(timeLeft <= 0) {
return;
}
else {
Thread.sleep(timeLeft);
}
}
}
Мне нужно убедиться, что мой метод выполняется не более M раз в скользящем окне N секунд.
Недавно я написал сообщение в блоге о том, как сделать это в.NET. Возможно, вы сможете создать нечто подобное в Java.
Хотя это не то, что вы спросили, ThreadPoolExecutor
, который предназначен для ограничения M одновременных запросов вместо M запросов в N секунд, также может быть полезным.
Я реализовал простой алгоритм регулирования. Попробуйте эту ссылку, http://krishnaprasadas.blogspot.in/2012/05/throttling-algorithm.html
Кратко об алгоритме,
Этот алгоритм использует возможности Java Delayed Queue. Создайте задержанный объект с ожидаемой задержкой (здесь 1000/M для миллисекунды TimeUnit). Поместите тот же объект в отложенную очередь, которая будет интерном для нас. Затем перед каждым вызовом метода возьмите объект из очереди, take является блокирующим вызовом, который будет возвращаться только после указанной задержки, и после вызова метода не забудьте поместить объект в очередь с обновленным временем (здесь текущие миллисекунды),
Здесь мы также можем иметь несколько задержанных объектов с различной задержкой. Этот подход также обеспечит высокую пропускную способность.
Исходный вопрос очень похож на проблему, решаемую в этом посте: многоканальный асинхронный дроссель Java.
Для скорости M вызовов в N секунд регулятор, обсуждаемый в этом блоге, гарантирует, что любой интервал длины N на временной шкале не будет содержать более M вызовов.
Это обновление кода LeakyBucket выше. Это работает для более чем 1000 запросов в секунду.
import lombok.SneakyThrows;
import java.util.concurrent.TimeUnit;
class LeakyBucket {
private long minTimeNano; // sec / billion
private long sched = System.nanoTime();
/**
* Create a rate limiter using the leakybucket alg.
* @param perSec the number of requests per second
*/
public LeakyBucket(double perSec) {
if (perSec <= 0.0) {
throw new RuntimeException("Invalid rate " + perSec);
}
this.minTimeNano = (long) (1_000_000_000.0 / perSec);
}
@SneakyThrows public void consume() {
long curr = System.nanoTime();
long timeLeft;
synchronized (this) {
timeLeft = sched - curr + minTimeNano;
sched += minTimeNano;
}
if (timeLeft <= minTimeNano) {
return;
}
TimeUnit.NANOSECONDS.sleep(timeLeft);
}
}
и unittest для выше:
import com.google.common.base.Stopwatch;
import org.junit.Ignore;
import org.junit.Test;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class LeakyBucketTest {
@Test @Ignore public void t() {
double numberPerSec = 10000;
LeakyBucket b = new LeakyBucket(numberPerSec);
Stopwatch w = Stopwatch.createStarted();
IntStream.range(0, (int) (numberPerSec * 5)).parallel().forEach(
x -> b.consume());
System.out.printf("%,d ms%n", w.elapsed(TimeUnit.MILLISECONDS));
}
}
Мое решение: простой метод утилиты, вы можете изменить его, чтобы создать класс-оболочку.
public static Runnable throttle (Runnable realRunner, long delay) {
Runnable throttleRunner = new Runnable() {
// whether is waiting to run
private boolean _isWaiting = false;
// target time to run realRunner
private long _timeToRun;
// specified delay time to wait
private long _delay = delay;
// Runnable that has the real task to run
private Runnable _realRunner = realRunner;
@Override
public void run() {
// current time
long now;
synchronized (this) {
// another thread is waiting, skip
if (_isWaiting) return;
now = System.currentTimeMillis();
// update time to run
// do not update it each time since
// you do not want to postpone it unlimited
_timeToRun = now+_delay;
// set waiting status
_isWaiting = true;
}
try {
Thread.sleep(_timeToRun-now);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// clear waiting status before run
_isWaiting = false;
// do the real task
_realRunner.run();
}
}};
return throttleRunner;
}
Взять из JAVA Thread Debounce и Throttle
Вот реализация ограничителя скорости, основанная на @tonywl (и отчасти связанная с дырявым ведром Дуарте Менезеса). Идея та же — используйте «пул токенов», чтобы разрешить как ограничение скорости, так и взрыв (выполнение нескольких вызовов за короткое время после небольшого простоя).
Эта реализация предлагает два основных отличия:
- Параллельный доступ без блокировки с использованием атомарных операций.
- Вместо того, чтобы блокировать запрос, рассчитайте задержку, необходимую для обеспечения ограничения скорости, и предложите это в качестве ответа, чтобы позволить вызывающей стороне принудительно применить задержку — это будет лучше работать с асинхронным программированием, которое вы можете найти в современных сетевых средах.
Полную реализацию с документацией можно найти в этом Github Gist, где я также буду публиковать обновления, но вот суть:
import java.util.concurrent.atomic.AtomicLong;
public class RateLimiter {
private final static long TOKEN_SIZE = 1_000_000 /* tockins per token */;
private final double tokenRate; // measured in tokens per ms
private final double tockinRate; // measured in tockins per ms
private final long tockinsLimit;
private AtomicLong available;
private AtomicLong lastTimeStamp;
public RateLimiter(int prefill, int limit, int fill, long interval) {
this.tokenRate = (double)fill / interval;
this.tockinsLimit = TOKEN_SIZE * limit;
this.tockinRate = tokenRate * TOKEN_SIZE;
this.lastTimeStamp = new AtomicLong(System.nanoTime());
this.available = new AtomicLong(Math.max(prefill, limit) * TOKEN_SIZE);
}
public boolean allowRequest() {
return whenNextAllowed(1, false) == 0;
}
public boolean allowRequest(int cost) {
return whenNextAllowed(cost, false) == 0;
}
public long whenNextAllowed(boolean alwaysConsume) {
return whenNextAllowed(1, alwaysConsume);
}
/**
* Check when will the next call be allowed, according to the specified rate.
* The value returned is in milliseconds. If the result is 0 - or if {@code alwaysConsume} was
* specified then the RateLimiter has recorded that the call has been allowed.
* @param cost How costly is the requested action. The base rate is 1 token per request,
* but the client can declare a more costly action that consumes more tokens.
* @param alwaysConsume if set to {@code true} this method assumes that the caller will delay
* the action that is rate limited but will perform it without checking again - so it will
* consume the specified number of tokens as if the action has gone through. This means that
* the pool can get into a deficit, which will further delay additional actions.
* @return how long before this request should be let through.
*/
public long whenNextAllowed(int cost, boolean alwaysConsume) {
var now = System.nanoTime();
var last = lastTimeStamp.getAndSet(now);
// calculate how many tockins we got since last call
// if the previous call was less than a microsecond ago, we still accumulate at least
// one tockin, which is probably more than we should, but this is too small to matter - right?
var add = (long)Math.ceil(tokenRate * (now - last));
var nowAvailable = available.addAndGet(add);
while (nowAvailable > tockinsLimit) {
available.compareAndSet(nowAvailable, tockinsLimit);
nowAvailable = available.get();
}
// answer the question
var toWait = (long)Math.ceil(Math.max(0, (TOKEN_SIZE - nowAvailable) / tockinRate));
if (alwaysConsume || toWait == 0) // the caller will let the request go through, so consume a token now
available.addAndGet(-TOKEN_SIZE);
return toWait;
}
}
Apache Camel также поддерживает механизм Throttler:
from("seda:a").throttle(100).asyncDelayed().to("seda:b");
Вы можете использовать Redis для этого, когда блокировка необходима в распределенной системе. Второй алгоритм в https://redis.io/commands/incr
Вот немного продвинутая версия простого ограничителя скорости
/**
* Simple request limiter based on Thread.sleep method.
* Create limiter instance via {@link #create(float)} and call {@link #consume()} before making any request.
* If the limit is exceeded cosume method locks and waits for current call rate to fall down below the limit
*/
public class RequestRateLimiter {
private long minTime;
private long lastSchedAction;
private double avgSpent = 0;
ArrayList<RatePeriod> periods;
@AllArgsConstructor
public static class RatePeriod{
@Getter
private LocalTime start;
@Getter
private LocalTime end;
@Getter
private float maxRate;
}
/**
* Create request limiter with maxRate - maximum number of requests per second
* @param maxRate - maximum number of requests per second
* @return
*/
public static RequestRateLimiter create(float maxRate){
return new RequestRateLimiter(Arrays.asList( new RatePeriod(LocalTime.of(0,0,0),
LocalTime.of(23,59,59), maxRate)));
}
/**
* Create request limiter with ratePeriods calendar - maximum number of requests per second in every period
* @param ratePeriods - rate calendar
* @return
*/
public static RequestRateLimiter create(List<RatePeriod> ratePeriods){
return new RequestRateLimiter(ratePeriods);
}
private void checkArgs(List<RatePeriod> ratePeriods){
for (RatePeriod rp: ratePeriods ){
if ( null == rp || rp.maxRate <= 0.0f || null == rp.start || null == rp.end )
throw new IllegalArgumentException("list contains null or rate is less then zero or period is zero length");
}
}
private float getCurrentRate(){
LocalTime now = LocalTime.now();
for (RatePeriod rp: periods){
if ( now.isAfter( rp.start ) && now.isBefore( rp.end ) )
return rp.maxRate;
}
return Float.MAX_VALUE;
}
private RequestRateLimiter(List<RatePeriod> ratePeriods){
checkArgs(ratePeriods);
periods = new ArrayList<>(ratePeriods.size());
periods.addAll(ratePeriods);
this.minTime = (long)(1000.0f / getCurrentRate());
this.lastSchedAction = System.currentTimeMillis() - minTime;
}
/**
* Call this method before making actual request.
* Method call locks until current rate falls down below the limit
* @throws InterruptedException
*/
public void consume() throws InterruptedException {
long timeLeft;
synchronized(this) {
long curTime = System.currentTimeMillis();
minTime = (long)(1000.0f / getCurrentRate());
timeLeft = lastSchedAction + minTime - curTime;
long timeSpent = curTime - lastSchedAction + timeLeft;
avgSpent = (avgSpent + timeSpent) / 2;
if(timeLeft <= 0) {
lastSchedAction = curTime;
return;
}
lastSchedAction = curTime + timeLeft;
}
Thread.sleep(timeLeft);
}
public synchronized float getCuRate(){
return (float) ( 1000d / avgSpent);
}
}
И юнит-тесты
import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class RequestRateLimiterTest {
@Test(expected = IllegalArgumentException.class)
public void checkSingleThreadZeroRate(){
// Zero rate
RequestRateLimiter limiter = RequestRateLimiter.create(0);
try {
limiter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Test
public void checkSingleThreadUnlimitedRate(){
// Unlimited
RequestRateLimiter limiter = RequestRateLimiter.create(Float.MAX_VALUE);
long started = System.currentTimeMillis();
for ( int i = 0; i < 1000; i++ ){
try {
limiter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long ended = System.currentTimeMillis();
System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( ((ended - started) < 1000));
}
@Test
public void rcheckSingleThreadRate(){
// 3 request per minute
RequestRateLimiter limiter = RequestRateLimiter.create(3f/60f);
long started = System.currentTimeMillis();
for ( int i = 0; i < 3; i++ ){
try {
limiter.consume();
Thread.sleep(20000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long ended = System.currentTimeMillis();
System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( ((ended - started) >= 60000 ) & ((ended - started) < 61000));
}
@Test
public void checkSingleThreadRateLimit(){
// 100 request per second
RequestRateLimiter limiter = RequestRateLimiter.create(100);
long started = System.currentTimeMillis();
for ( int i = 0; i < 1000; i++ ){
try {
limiter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long ended = System.currentTimeMillis();
System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( (ended - started) >= ( 10000 - 100 ));
}
@Test
public void checkMultiThreadedRateLimit(){
// 100 request per second
RequestRateLimiter limiter = RequestRateLimiter.create(100);
long started = System.currentTimeMillis();
List<Future<?>> tasks = new ArrayList<>(10);
ExecutorService exec = Executors.newFixedThreadPool(10);
for ( int i = 0; i < 10; i++ ) {
tasks.add( exec.submit(() -> {
for (int i1 = 0; i1 < 100; i1++) {
try {
limiter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) );
}
tasks.stream().forEach( future -> {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
long ended = System.currentTimeMillis();
System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
}
@Test
public void checkMultiThreaded32RateLimit(){
// 0,2 request per second
RequestRateLimiter limiter = RequestRateLimiter.create(0.2f);
long started = System.currentTimeMillis();
List<Future<?>> tasks = new ArrayList<>(8);
ExecutorService exec = Executors.newFixedThreadPool(8);
for ( int i = 0; i < 8; i++ ) {
tasks.add( exec.submit(() -> {
for (int i1 = 0; i1 < 2; i1++) {
try {
limiter.consume();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) );
}
tasks.stream().forEach( future -> {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
long ended = System.currentTimeMillis();
System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
}
@Test
public void checkMultiThreadedRateLimitDynamicRate(){
// 100 request per second
RequestRateLimiter limiter = RequestRateLimiter.create(100);
long started = System.currentTimeMillis();
List<Future<?>> tasks = new ArrayList<>(10);
ExecutorService exec = Executors.newFixedThreadPool(10);
for ( int i = 0; i < 10; i++ ) {
tasks.add( exec.submit(() -> {
Random r = new Random();
for (int i1 = 0; i1 < 100; i1++) {
try {
limiter.consume();
Thread.sleep(r.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}) );
}
tasks.stream().forEach( future -> {
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
});
long ended = System.currentTimeMillis();
System.out.println( "Current rate:" + limiter.getCurRate() );
Assert.assertTrue( (ended - started) >= ( 10000 - 100 ) );
}
}
Попробуйте использовать этот простой подход:
public class SimpleThrottler {
private static final int T = 1; // min
private static final int N = 345;
private Lock lock = new ReentrantLock();
private Condition newFrame = lock.newCondition();
private volatile boolean currentFrame = true;
public SimpleThrottler() {
handleForGate();
}
/**
* Payload
*/
private void job() {
try {
Thread.sleep(Math.abs(ThreadLocalRandom.current().nextLong(12, 98)));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.err.print(" J. ");
}
public void doJob() throws InterruptedException {
lock.lock();
try {
while (true) {
int count = 0;
while (count < N && currentFrame) {
job();
count++;
}
newFrame.await();
currentFrame = true;
}
} finally {
lock.unlock();
}
}
public void handleForGate() {
Thread handler = new Thread(() -> {
while (true) {
try {
Thread.sleep(1 * 900);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
currentFrame = false;
lock.lock();
try {
newFrame.signal();
} finally {
lock.unlock();
}
}
}
});
handler.start();
}
}