Что такое хороший алгоритм ограничения скорости?

Я мог бы использовать некоторый псевдокод или, лучше сказать, Python. Я пытаюсь реализовать очередь ограничения скорости для бота Python IRC, и она частично работает, но если кто-то запускает меньше сообщений, чем предел (например, ограничение скорости составляет 5 сообщений в 8 секунд, а человек запускает только 4), и следующий триггер длится более 8 секунд (например, через 16 секунд), бот отправляет сообщение, но очередь заполняется, и бот ждет 8 секунд, даже если это не нужно, поскольку истек 8-секундный период.

12 ответов

Решение

Вот самый простой алгоритм, если вы хотите просто отбрасывать сообщения, когда они приходят слишком быстро (вместо того, чтобы ставить их в очередь, что имеет смысл, потому что очередь может стать произвольно большой):

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    discard_message();
  else:
    forward_message();
    allowance -= 1.0;

В этом решении нет структур данных, таймеров и т. Д., И оно работает чисто:). Чтобы увидеть это, "допуск" растет со скоростью не более 5/8 единиц в секунду, то есть не более пяти единиц в восемь секунд. Каждое пересылаемое сообщение удерживает одну единицу, поэтому вы не можете отправлять более пяти сообщений каждые восемь секунд.

Обратите внимание, что rate должно быть целым числом, то есть без ненулевой десятичной части, иначе алгоритм не будет работать правильно (фактическая скорость не будет rate/per). Например rate=0.5; per=1.0; не работает, потому что allowance никогда не вырастет до 1,0. Но rate=1.0; per=2.0; работает отлично.

Используйте этот декоратор @RateLimited(ratepersec) перед вашей функцией, которая ставит в очередь.

По сути, это проверяет, прошло ли 1/ скорость в секундах с прошлого раза, и если нет, ждет оставшуюся часть времени, в противном случае он не ждет. Это эффективно ограничивает вас, чтобы оценить / сек. Декоратор может быть применен к любой функции, которую вы хотите ограничить по скорости.

В вашем случае, если вы хотите максимум 5 сообщений за 8 секунд, используйте @RateLimited(0.625) перед функцией sendToQueue.

import time

def RateLimited(maxPerSecond):
    minInterval = 1.0 / float(maxPerSecond)
    def decorate(func):
        lastTimeCalled = [0.0]
        def rateLimitedFunction(*args,**kargs):
            elapsed = time.clock() - lastTimeCalled[0]
            leftToWait = minInterval - elapsed
            if leftToWait>0:
                time.sleep(leftToWait)
            ret = func(*args,**kargs)
            lastTimeCalled[0] = time.clock()
            return ret
        return rateLimitedFunction
    return decorate

@RateLimited(2)  # 2 per second at most
def PrintNumber(num):
    print num

if __name__ == "__main__":
    print "This should print 1,2,3... at about 2 per second."
    for i in range(1,100):
        PrintNumber(i)

Token Bucket довольно прост в реализации.

Начните с ведра с 5 жетонов.

Каждые 5/8 секунд: если в ведре меньше 5 жетонов, добавьте один.

Каждый раз, когда вы хотите отправить сообщение: если в корзине есть токен ≥1, выньте один токен и отправьте сообщение. В противном случае, подождите / отбросьте сообщение / что угодно.

(очевидно, что в реальном коде вы бы использовали целочисленный счетчик вместо реальных токенов, и вы можете оптимизировать каждый шаг 5/8, сохраняя временные метки)


Повторное чтение вопроса, если ограничение скорости полностью сбрасывается каждые 8 ​​секунд, то вот модификация:

Начните с отметки времени, last_sendДавным-давно (например, в эпоху). Кроме того, начните с той же корзины с 5 токенами.

Ударьте правило каждые 5/8 секунд.

Каждый раз, когда вы отправляете сообщение: сначала проверьте, last_send ≥ 8 секунд назад. Если так, заполните ведро (установите это к 5 жетонам). Во-вторых, если в корзине есть токены, отправьте сообщение (в противном случае отбросьте / подождите / и т.д.). В-третьих, установить last_send к данному моменту.

Это должно работать для этого сценария.


Я на самом деле написал IRC-бот, используя такую ​​стратегию (первый подход). Это на Perl, а не на Python, но вот некоторый код для иллюстрации:

Первая часть здесь посвящена добавлению токенов в корзину. Вы можете увидеть оптимизацию добавления токенов на основе времени (от 2-й до последней строки), а затем последняя строка ограничивает содержимое сегмента до максимума (MESSAGE_BURST)

    my $start_time = time;
    ...
    # Bucket handling
    my $bucket = $conn->{fujiko_limit_bucket};
    my $lasttx = $conn->{fujiko_limit_lasttx};
    $bucket += ($start_time-$lasttx)/MESSAGE_INTERVAL;
    ($bucket <= MESSAGE_BURST) or $bucket = MESSAGE_BURST;

$conn - это структура данных, которая передается. Это внутри метода, который выполняется регулярно (он рассчитывает, когда в следующий раз ему будет что-то делать, и спит либо так долго, либо до тех пор, пока не получит сетевой трафик). Следующая часть метода обрабатывает отправку. Это довольно сложно, потому что сообщения имеют приоритеты, связанные с ними.

    # Queue handling. Start with the ultimate queue.
    my $queues = $conn->{fujiko_queues};
    foreach my $entry (@{$queues->[PRIORITY_ULTIMATE]}) {
            # Ultimate is special. We run ultimate no matter what. Even if
            # it sends the bucket negative.
            --$bucket;
            $entry->{code}(@{$entry->{args}});
    }
    $queues->[PRIORITY_ULTIMATE] = [];

Это первая очередь, которая запускается несмотря ни на что. Даже если это уничтожит нашу связь из-за наводнения. Используется для чрезвычайно важных вещей, например, для ответа на PING сервера. Далее остальные очереди:

    # Continue to the other queues, in order of priority.
    QRUN: for (my $pri = PRIORITY_HIGH; $pri >= PRIORITY_JUNK; --$pri) {
            my $queue = $queues->[$pri];
            while (scalar(@$queue)) {
                    if ($bucket < 1) {
                            # continue later.
                            $need_more_time = 1;
                            last QRUN;
                    } else {
                            --$bucket;
                            my $entry = shift @$queue;
                            $entry->{code}(@{$entry->{args}});
                    }
            }
    }

Наконец, статус корзины сохраняется обратно в структуру данных $conn (на самом деле чуть позже в методе; сначала он вычисляет, как скоро у него будет больше работы)

    # Save status.
    $conn->{fujiko_limit_bucket} = $bucket;
    $conn->{fujiko_limit_lasttx} = $start_time;

Как видите, реальный код обработки сегментов очень мал - около четырех строк. Остальная часть кода является приоритетной обработкой очереди. У бота есть приоритетные очереди, так что, например, кто-то, общаясь с ним, не может помешать ему выполнять свои важные обязанности по кик-бану.

Чтобы заблокировать обработку, пока сообщение не может быть отправлено, таким образом, помещая в очередь дальнейшие сообщения, красивое решение antti также может быть изменено следующим образом:

rate = 5.0; // unit: messages
per  = 8.0; // unit: seconds
allowance = rate; // unit: messages
last_check = now(); // floating-point, e.g. usec accuracy. Unit: seconds

when (message_received):
  current = now();
  time_passed = current - last_check;
  last_check = current;
  allowance += time_passed * (rate / per);
  if (allowance > rate):
    allowance = rate; // throttle
  if (allowance < 1.0):
    time.sleep( (1-allowance) * (per/rate))
    forward_message();
    allowance = 0.0;
  else:
    forward_message();
    allowance -= 1.0;

он просто ждет, пока не будет достаточно разрешения для отправки сообщения. чтобы не начинать с двухкратной ставки, скидка также может быть инициализирована с 0.

Одним из решений является прикрепление метки времени к каждому элементу очереди и удаление элемента по истечении 8 секунд. Вы можете выполнять эту проверку каждый раз, когда очередь добавляется.

Это работает, только если вы ограничите размер очереди до 5 и откажетесь от любых добавлений, пока очередь заполнена.

Если кому-то все еще интересно, я использую этот простой вызываемый класс в сочетании с временным хранилищем значений ключа LRU, чтобы ограничить частоту запросов на IP. Использует deque, но может быть переписан для использования со списком.

from collections import deque
import time


class RateLimiter:
    def __init__(self, maxRate=5, timeUnit=1):
        self.timeUnit = timeUnit
        self.deque = deque(maxlen=maxRate)

    def __call__(self):
        if self.deque.maxlen == len(self.deque):
            cTime = time.time()
            if cTime - self.deque[0] > self.timeUnit:
                self.deque.append(cTime)
                return False
            else:
                return True
        self.deque.append(time.time())
        return False

r = RateLimiter()
for i in range(0,100):
    time.sleep(0.1)
    print(i, "block" if r() else "pass")

Сохраните время, когда были отправлены последние пять строк. Удерживайте сообщения в очереди до тех пор, пока пятое самое последнее сообщение (если оно существует) не пройдет как минимум 8 секунд в прошлом (с last_five в виде массива раз):

now = time.time()
if len(last_five) == 0 or (now - last_five[-1]) >= 8.0:
    last_five.insert(0, now)
    send_message(msg)
if len(last_five) > 5:
    last_five.pop()

Просто реализация Python кода из принятого ответа.

import time

class Object(object):
    pass

def get_throttler(rate, per):
    scope = Object()
    scope.allowance = rate
    scope.last_check = time.time()
    def throttler(fn):
        current = time.time()
        time_passed = current - scope.last_check;
        scope.last_check = current;
        scope.allowance = scope.allowance + time_passed * (rate / per)
        if (scope.allowance > rate):
          scope.allowance = rate
        if (scope.allowance < 1):
          pass
        else:
          fn()
          scope.allowance = scope.allowance - 1
    return throttler

Мне нужна была вариация в Scala. Вот:

case class Limiter[-A, +B](callsPerSecond: (Double, Double), f: A ⇒ B) extends (A ⇒ B) {

  import Thread.sleep
  private def now = System.currentTimeMillis / 1000.0
  private val (calls, sec) = callsPerSecond
  private var allowance  = 1.0
  private var last = now

  def apply(a: A): B = {
    synchronized {
      val t = now
      val delta_t = t - last
      last = t
      allowance += delta_t * (calls / sec)
      if (allowance > calls)
        allowance = calls
      if (allowance < 1d) {
        sleep(((1 - allowance) * (sec / calls) * 1000d).toLong)
      }
      allowance -= 1
    }
    f(a)
  }

}

Вот как это можно использовать:

val f = Limiter((5d, 8d), { 
  _: Unit ⇒ 
    println(System.currentTimeMillis) 
})
while(true){f(())}

Синтаксис Java, основная идея: не считать итерации, считать время скачка. Запомните время последнего прыжка, подождите необходимое время, чтобы не превысить скорость прыжка.

      public static void limitRate(int rate, AtomicLong leapTime, ReentrantLock rateLock) {
    long targetLeapTime = 1_000_000_000 / rate;
    rateLock.lock();
    try {
        long timeSnapshot = nanoTime();
        long waitTime = targetLeapTime - (timeSnapshot - leapTime.get());
        if (waitTime > 0) {

            LockSupport.parkNanos(waitTime);

            leapTime.set(timeSnapshot + waitTime);
        } else {
            leapTime.set(timeSnapshot);
        }
    } finally {
        rateLock.unlock();
    }
}

еще одно решение

      from collections import deque
from datetime import timedelta
from time import sleep

class RateLimiter:
    def __init__(self, items: int, per: timedelta = timedelta(seconds=1)):
        self.items = items
        self.per = per
        self.deque = deque(maxlen=items)

    def count(self):
        now = datetime.now()
        self.deque.append(now)

    def time_to_wait(self) -> timedelta:
        if len(self.deque) < self.deque.maxlen:
            return timedelta(0)
        now = datetime.now()
        per = now - self.deque[0]
        return max(timedelta(0), self.per - per)

    def throttle(self):
        sleep(self.time_to_wait().total_seconds())
        self.count()

if __name__ == '__main__':
    rate_limiter = RateLimiter(items=3, per=timedelta(seconds=3))

    for i in range(10):
        rate_limiter.throttle()
        print(f'{i}')

Как насчет этого:

long check_time = System.currentTimeMillis();
int msgs_sent_count = 0;

private boolean isRateLimited(int msgs_per_sec) {
    if (System.currentTimeMillis() - check_time > 1000) {
        check_time = System.currentTimeMillis();
        msgs_sent_count = 0;
    }

    if (msgs_sent_count > (msgs_per_sec - 1)) {
        return true;
    } else {
        msgs_sent_count++;
    }

    return false;
}
Другие вопросы по тегам