Node-amqp - отклонение сообщения после X попыток
Как реализовать механизм, который отклоняет сообщение после нескольких настраиваемых попыток запроса?
Другими словами, если я подписываюсь на очередь, я хочу гарантировать, что одно и то же сообщение не будет доставлено более X раз.
Мой пример кода:
q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
try{
doSomething(data);
} catch(e) {
message.reject(true);
}
}
2 ответа
На мой взгляд, лучшим решением является обработка этих ошибок в вашем приложении и отклонение их, когда приложение решило, что оно не может обработать сообщение.
Если вы не хотите терять информацию, приложение должно отклонить сообщение только после того, как оно отправило то же сообщение в очередь ошибок.
код не проверен:
q.subscribe({ack: true}, function () {
var numOfRetries = 0;
var args = arguments;
var self = this;
var promise = doWork.apply(self, args);
for (var numOfRetries = 0; numOfRetries < MAX_RETRIES; numOfRetries++) {
promise = promise.fail(function () { return doWork.apply(self, args); });
}
promise.fail(function () {
sendMessageToErrorQueue.apply(self, args);
rejectMessage.apply(self, args);
})
})
Одним из возможных решений является хеширование сообщения с использованием определенной вами хэш-функции, а затем проверка объекта кэша на наличие этого хеша. Если он есть, добавьте его в кэш до настраиваемого максимума, а если его там нет, установите его на 1. Вот быстрый и грязный прототип для вас (обратите внимание, что mcache
объект должен находиться в области действия всех подписчиков):
var mcache = {}, maxRetries = 3;
q.subscribe({ack: true}, function(data,headers,deliveryInfo,message) {
var messagehash = hash(message);
if(mcache[messagehash] === undefined){
mcache[messagehash] = 0;
}
if(mcache[messagehash] > maxRetries) {
q.shift(true,false); //reject true, requeue false (discard message)
delete mcache[messagehash]; //don't leak memory
} else {
try{
doSomething(data);
q.shift(false); //reject false
delete mcache[messagehash]; //don't leak memory
} catch(e) {
mcache[messagehash]++;
q.shift(true,true); //reject true, requeue true
}
}
}
если сообщение имеет GUID, вы можете просто вернуть его в хеш-функцию.