Node-amqp - отклонение сообщения после X попыток

Как реализовать механизм, который отклоняет сообщение после нескольких настраиваемых попыток запроса?

Другими словами, если я подписываюсь на очередь, я хочу гарантировать, что одно и то же сообщение не будет доставлено более X раз.

Мой пример кода:

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
  try{
    doSomething(data);
  } catch(e) {
   message.reject(true);
  }
}

2 ответа

Решение

На мой взгляд, лучшим решением является обработка этих ошибок в вашем приложении и отклонение их, когда приложение решило, что оно не может обработать сообщение.

Если вы не хотите терять информацию, приложение должно отклонить сообщение только после того, как оно отправило то же сообщение в очередь ошибок.

код не проверен:

q.subscribe({ack: true}, function () {
  var numOfRetries = 0;
  var args = arguments;
  var self = this;
  var promise = doWork.apply(self, args);
  for (var numOfRetries = 0; numOfRetries < MAX_RETRIES; numOfRetries++) {
    promise = promise.fail(function () { return doWork.apply(self, args); });
  }

  promise.fail(function () {
    sendMessageToErrorQueue.apply(self, args);
    rejectMessage.apply(self, args);
  })
})

Одним из возможных решений является хеширование сообщения с использованием определенной вами хэш-функции, а затем проверка объекта кэша на наличие этого хеша. Если он есть, добавьте его в кэш до настраиваемого максимума, а если его там нет, установите его на 1. Вот быстрый и грязный прототип для вас (обратите внимание, что mcache объект должен находиться в области действия всех подписчиков):

var mcache = {}, maxRetries = 3;

q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
  var messagehash = hash(message);
  if(mcache[messagehash] === undefined){
    mcache[messagehash] = 0;
  }
  if(mcache[messagehash] > maxRetries) {
    q.shift(true,false); //reject true, requeue false (discard message)
    delete mcache[messagehash]; //don't leak memory
  } else {
    try{
      doSomething(data);
      q.shift(false); //reject false
      delete mcache[messagehash]; //don't leak memory
    } catch(e) {
      mcache[messagehash]++;
      q.shift(true,true); //reject true, requeue true
    }
  }
}

если сообщение имеет GUID, вы можете просто вернуть его в хеш-функцию.

Другие вопросы по тегам