Ограничить количество задач, удаляемых из очереди в секунду для rabbitmq

У меня есть очередь в rabbitmq в стиле потребительского производителя, которая правильно работает как базовая очередь с циклическим перебором. Моя проблема в том, что я пытаюсь ограничить количество запросов, обрабатываемых в секунду, потому что, когда я удаляю элемент из очереди, я делаю запрос в пространство DO, которое заблокирует мой IP-адрес, если я сделаю 750 запросов или более в секунду. Я использую горутины для одновременного удаления элементов из очереди, но я хочу удалять только 500 элементов за раз в секунду, чтобы избежать превышения этого лимита. Это должно учитывать элементы, которые в настоящее время удаляются из очереди (т.е. я не могу просто вытащить 500 элементов из очереди, а затем отложить до следующей секунды), в основном до того, как он запустит код удаления из очереди, ему нужно подождать, чтобы убедиться, что есть за эту секунду не было удалено более 500 запросов. Пока у меня есть этот код, но его нет.t, похоже, работает правильно (обратите внимание, что сейчас я тестирую 2 запроса в секунду вместо 500). Время от времени у него будут очень длинные задержки (например, 20+ секунд), и я не уверен, что он правильно рассчитывает лимит. Обратите внимание, что я почти уверен, что опция предварительной выборки - это не то, что мне нужно здесь, потому что это ограничивает количество сообщений, приходящих в секунду, здесь я просто хочу ограничить количество сообщений, удаляемых из очереди одновременно в секунду.

      import (
   "os"
   "fmt"
   "github.com/streadway/amqp"
   "golang.org/x/time/rate"
   "context"
)


// Rate-limit => 2 req/s
const (
   workers = 2
)

func failOnErrorWorker(err error, msg string) {
   if err != nil {
       fmt.Println(msg)
       fmt.Println(err)
   }
}

func main() {
   // Get the env variables for the queue name and connection string
   queueName := os.Getenv("QUEUE_NAME")
   connectionString := os.Getenv("CONNECTION_STRING")

   // Set up rate limiter and context
   limiter := rate.NewLimiter(2, 1)
   ctx := context.Background()

   // Connect to the rabbitmq instance
   conn, err := amqp.Dial(connectionString)
   failOnErrorWorker(err, "Failed to connect to RabbitMQ")
   defer conn.Close()

   // Open a channel for the queue
   ch, err := conn.Channel()
   failOnErrorWorker(err, "Failed to open a channel")
   defer ch.Close()

   // Consume the messages from this queue
   msgs, err := ch.Consume(
       queueName, // queue
       "",     // consumer
       false,  // auto-ack
       false,  // exclusive
       false,  // no-local
       false,  // no-wait
       nil,    // args
   )
   failOnErrorWorker(err, "Failed to register a consumer")

   forever := make(chan bool)

   go func() {
       for d := range msgs {
           // Wait until there are less than 2 workers per second
           limiter.Wait(ctx)
           go func() {
               // Dequeue the item and acknowledge the message
               DeQueue(d.Body)
               d.Ack(false)
           } ()
       }
   }()

   fmt.Println(" [*] Waiting for messages. To exit press CTRL+C")
   // Continually run the worker
   <-forever
}

0 ответов

Другие вопросы по тегам