EasyNetQ - Как сохранить RetryCount в теле сообщения / заголовке?

Я использую EasyNetQ и мне нужно повторить неудачные сообщения в исходной очереди. Проблема заключается в том, что, хотя я успешно увеличиваю переменную TriedCount (в теле каждой msg), когда EasyNetQ публикует сообщение в очередь ошибок по умолчанию после исключения, обновленный TriedCount отсутствует в msg! Предположительно, потому что он просто сбрасывает исходное сообщение в очередь ошибок без изменений потребителя. Причина, по которой я делаю это для потребителя, заключается в том, что на сообщение может быть несколько подписчиков, и каждому может потребоваться повторить попытку разное количество раз; Итак, у меня есть словарь (ключ:subscriptionId, val:TriedCount) в каждом сообщении.

У меня есть класс обработчика сообщений, который содержит Func в качестве функции обратного вызова конкретного сообщения. Перед вызовом обратного вызова обработчик сообщений обновляет TriedCount и проверяет, находится ли он в MaxTries. Эта часть работает для повторных публикаций в процессе, но не при повторной публикации через EasyNetQ Hosepipe или EasyNetQ Management Client. Текстовые файлы, создаваемые Hosepipe, не имеют обновленного TriedCount.

public interface IMsgHandler<T> where T: class, IMessageType
{
    Task InvokeMsgCallbackFunc(T msg);
    Func<T, Task> MsgCallbackFunc { get; set; }
    bool IsTryValid(ref T msg, string refSubscriptionId); //updates  
                                                          //TriedCount and 
                                                          //publishes only 
                                                          //if it is less 
                                                          //than MaxTries
}
public interface IMessageType
{
    int MsgTypeId { get; }
    Dictionary<string, TryInfo> MsgTryInfo {get; set;}
}
[Serializable]
public class TryInfo
{   
    public int TriedCount { get; set; }

    /*Other information regarding msg attempt*/
}

public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{

IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);

return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}

Я также попытался переиздать себя через API управления (грубый код):

var client = new ManagementClient("http://localhost", "guest", "guest");
var vhost = client.GetVhostAsync("/").Result;
var errQueue = client.GetQueueAsync("EasyNetQ_Default_Error_Queue", 
vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, 
Ackmodes.ack_requeue_true);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, 
crit).Result;

foreach (var errMsg in errMsgs)
{
    var pubRes = client.PublishAsync(client.GetExchangeAsync(errMsg.Exchange, vhost).Result,
                                 new PublishInfo(errMsg.RoutingKey, errMsg.Payload)).Result;
        }

Это работает, но только снова публикуется в очереди ошибок, что не очень хорошо, потому что мне нужно опубликовать его в исходной очереди, но эта информация здесь недоступна. Кроме того, я не знаю, как добавить / обновить информацию о повторных попытках в теле сообщения на этом этапе.

Я исследовал эту библиотеку, чтобы добавить заголовки к сообщению, но я не вижу, обновляется ли счетчик в теле, как / почему будет обновляться счетчик в заголовке.

Есть ли способ сохранить TriedCount без использования шины Advanced (в этом случае я мог бы использовать сам клиент RabbitMQ .Net)?

0 ответов

На всякий случай, если это поможет кому-то другому, я в конце концов реализовал свой собственный IErrorMessageSerializer (в отличие от реализации всей IConsumerErrorStrategy, что казалось излишним). Причина, по которой я добавляю информацию о повторных попытках в тело (вместо заголовка), заключается в том, что EasyNetQ не обрабатывает сложные типы в заголовке (во всяком случае, не из коробки). Таким образом, использование словаря дает больше контроля для разных потребителей. Я регистрирую пользовательский сериализатор во время создания шины следующим образом:

_defaultBus = RabbitHutch.CreateBus(currentConnString, serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId)));

И просто реализовал метод Serialize вот так:

 public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
 {
        public string Serialize(byte[] messageBody)
        {
             string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
             var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);

             // Add/update RetryInformation into objectifiedMsgBody here
             // I have a dictionary that saves <key:consumerId, val: TryInfoObj>

             return JsonConvert.SerializeObject(objectifiedMsgBody);
        }
  }

Фактическая повторная попытка выполняется с помощью простого консольного приложения / службы Windows периодически через API управления EasyNetQ:

            var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
            var vhost = client.GetVhostAsync("/").Result;
            var aliveRes = client.IsAliveAsync(vhost).Result;
            var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
            var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
            var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
            foreach (var errMsg in errMsgs)
            {
                var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
                var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
                pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
                pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
                pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
                var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result,
                     pubInfo).Result;
            }

Включена ли повторная попытка или нет, известно самому моему потребителю, что дает ему больше контроля, поэтому он может обрабатывать повторное сообщение или просто игнорировать его. После игнорирования сообщение, очевидно, не будет повторяться снова; вот как работает EasyNetQ.

Другие вопросы по тегам