ЛАГ. Дросселирование для внешнего обслуживания

Я играю с Лагом, это выглядит красиво, но я полностью проиграл с одной проблемой.

Допустим, у меня есть зависимость от внешнего HTTP-сервиса, он разрешает только 10 запросов / сек, в другом случае даже может забанить:) Я гуглил, но не нашел ни одного работающего примера. Я могу обернуть сервис в нетипизированный актер и добавить к нему ограничитель скорости, но я не понимаю, как реализовать его поверх сервиса Akka Typed или Lagom.

Может, кто-то уже решил такую ​​задачу? Спасибо!

1 ответ

Вы хотите ведро с токеном. В Akka-Streams это встроено в Flow.throttle, но похоже, что вы используете raw akka, поэтому не можете его использовать. В Akka существует реализация корзины токенов, но, к сожалению, она не дает никаких указаний по использованию, и я сам ею не пользовался.

Для моих собственных случаев использования (не Akka, а используя Scala Futures) я написал свое собственное ведение токена. Это позволяет мне ссылаться на запуск будущего на основе указанного лимита. Он закодирован против планировщика monix, но для этой цели он очень похож на расписание Akka:

import java.util.concurrent.ConcurrentLinkedQueue

import monix.execution.Scheduler.Implicits.global
import monix.execution.atomic.AtomicInt

import scala.concurrent.{Future, Promise}
import scala.concurrent.duration._

case class RateLimiter(duration: FiniteDuration, maxInvocations: Int) {

  @volatile var permits: Int = maxInvocations
  val queue = new ConcurrentLinkedQueue[() => Any]()

  global.scheduleAtFixedRate(duration, duration) {
    this synchronized {
      permits = maxInvocations

      while (!queue.isEmpty && permits > 0) {
        Option(queue.poll()).foreach { fun =>
          permits -= 1
          fun.apply()
        }
      }
    }
  }

  def apply[T](f: => Future[T]): Future[T] =
    this synchronized {
      if (permits > 0) {
        permits -= 1
        f
      } else {
        val res = Promise[T]()
        queue.add(() => { res.completeWith(f) })
        res.future
      }
    }
}

Использование

val limiter = RateLimiter(1.second, 10)

limiter {
  someWebService.asyncCall()
}
Другие вопросы по тегам