Драйвер RabbitMQ C# прекращает прием сообщений
У вас есть указания, как определить, когда возникла проблема с подпиской, чтобы я мог подключиться?
Мой сервис использует RabbitMQ.Client.MessagePatterns.Subscription для своей подписки. Через некоторое время мой клиент молча перестает получать сообщения. Я подозреваю проблемы с сетью, так как наше VPN-соединение не самое надежное.
Я некоторое время читал документы, чтобы найти ключ, чтобы узнать, когда эта подписка может быть нарушена из-за проблем с сетью без особой удачи. Я пытался проверить, что соединение и канал все еще открыты, но всегда кажется, что он все еще открыт.
Сообщения, которые он обрабатывает, работают довольно хорошо и возвращаются в очередь, поэтому я не думаю, что это проблема с "ack".
Я уверен, что я просто упускаю что-то простое, но я еще не нашел это.
public void Run(string brokerUri, Action<byte[]> handler)
{
log.Debug("Connecting to broker: {0}".Fill(brokerUri));
ConnectionFactory factory = new ConnectionFactory { Uri = brokerUri };
using (IConnection connection = factory.CreateConnection())
{
using (IModel channel = connection.CreateModel())
{
channel.QueueDeclare(queueName, true, false, false, null);
using (Subscription subscription = new Subscription(channel, queueName, false))
{
while (!Cancelled)
{
BasicDeliverEventArgs args;
if (!channel.IsOpen)
{
log.Error("The channel is no longer open, but we are still trying to process messages.");
throw new InvalidOperationException("Channel is closed.");
}
else if (!connection.IsOpen)
{
log.Error("The connection is no longer open, but we are still trying to process message.");
throw new InvalidOperationException("Connection is closed.");
}
bool gotMessage = subscription.Next(250, out args);
if (gotMessage)
{
log.Debug("Received message");
try
{
handler(args.Body);
}
catch (Exception e)
{
log.Debug("Exception caught while processing message. Will be bubbled up.", e);
throw;
}
log.Debug("Acknowledging message completion");
subscription.Ack(args);
}
}
}
}
}
}
ОБНОВИТЬ:
Я смоделировал сбой сети, запустив сервер на виртуальной машине, и у меня получилось исключение (RabbitMQ.Client.Exceptions.OperationInterruptedException: операция AMQP была прервана), когда я разрываю соединение достаточно долго, поэтому, возможно, это не сеть вопрос. Теперь я не знаю, что это было бы, но это потерпело неудачу после пары часов бега.
1 ответ
РЕДАКТИРОВАТЬ: Поскольку я по-прежнему получаю положительные отзывы по этому вопросу, я должен отметить, что клиент.NET RabbitMQ теперь имеет эту встроенную функциональность: https://www.rabbitmq.com/dotnet-api-guide.html
В идеале вы должны быть в состоянии использовать это и избегать ручной реализации логики переподключения.
Недавно мне пришлось реализовать почти то же самое. Из того, что я могу сказать, большая часть доступной информации о RabbitMQ предполагает, что либо ваша сеть очень надежна, либо вы запускаете брокер RabbitMQ на той же машине, что и любой клиент, отправляющий или получающий сообщения, что позволяет Rabbit решать любые проблемы с подключением.
На самом деле не так сложно настроить клиент Rabbit таким образом, чтобы он был устойчив к разрыву соединений, но есть несколько особенностей, с которыми вам нужно иметь дело.
Первое, что вам нужно сделать, это включить сердцебиение:
ConnectionFactory factory = new ConnectionFactory()
{
Uri = brokerUri,
RequestedHeartbeat = 30,
};
Если установить для параметра "RequestedHeartbeat" значение 30, клиент будет каждые 30 секунд проверять, живо ли соединение. Без этого подписчик сообщения будет радостно ждать, пока не придет другое сообщение, не зная, что его соединение разорвано.
Включение пульса также заставляет сервер проверить, установлено ли соединение, что может быть очень важным. Если соединение обрывается после того, как сообщение было получено подписчиком, но до того, как оно было подтверждено, сервер просто предполагает, что клиент занимает много времени, и сообщение "застревает" на мертвом соединении, пока оно не будет закрыто. Когда пульс включен, сервер распознает, когда соединение обрывается, и закрывает его, помещая сообщение обратно в очередь, чтобы его мог обработать другой абонент. Без пульса я должен был войти вручную и закрыть соединение в пользовательском интерфейсе управления Rabbit, чтобы застрявшее сообщение могло быть передано подписчику.
Во-вторых, вам нужно будет справиться OperationInterruptedException
, Как вы заметили, обычно это исключение, которое выдает клиент Rabbit, когда он замечает, что соединение было прервано. Если IModel.QueueDeclare()
вызывается, когда соединение было прервано, это исключение, которое вы получите. Обработайте это исключение, избавившись от своей подписки, канала и соединения и создав новые.
Наконец, вам придется обрабатывать действия своего потребителя, когда он пытается получать сообщения из закрытого соединения. К сожалению, каждый другой способ потребления сообщений из очереди в клиенте Rabbit, похоже, реагирует по-разному. QueueingBasicConsumer
бросает EndOfStreamException
если ты позвонишь QueueingBasicConsumer.Queue.Dequeue
по закрытому соединению. EventingBasicConsumer
ничего не делает, так как просто ждет сообщения. Из того, что я могу сказать, попробовав это, Subscription
класс, который вы используете, кажется, возвращает истину от вызова Subscription.Next
, но ценность args
нулевой. Еще раз, справьтесь с этим, избавившись от своего соединения, канала и подписки и создав их заново.
Значение connection.IsOpen
будет обновлено до False, когда соединение не будет установлено с включенным пульсом, так что вы можете проверить это, если хотите. Однако, поскольку сердцебиение выполняется в отдельном потоке, вам все равно придется обрабатывать случай, когда соединение открыто, когда вы его проверяете, но закрывается до того, как subscription.Next()
называется.
И последнее, на что нужно обратить внимание: IConnection.Dispose()
, Этот вызов бросит EndOfStreamException
если вы позвоните утилизировать после того, как соединение было закрыто. Мне это кажется ошибкой, и я не люблю не звонить утилизировать IDisposable
объект, поэтому я называю это и проглатываю исключение.
Собираем все это в быстрый и грязный пример:
public bool Cancelled { get; set; }
IConnection _connection = null;
IModel _channel = null;
Subscription _subscription = null;
public void Run(string brokerUri, string queueName, Action<byte[]> handler)
{
ConnectionFactory factory = new ConnectionFactory()
{
Uri = brokerUri,
RequestedHeartbeat = 30,
};
while (!Cancelled)
{
try
{
if(_subscription == null)
{
try
{
_connection = factory.CreateConnection();
}
catch(BrokerUnreachableException)
{
//You probably want to log the error and cancel after N tries,
//otherwise start the loop over to try to connect again after a second or so.
continue;
}
_channel = _connection.CreateModel();
_channel.QueueDeclare(queueName, true, false, false, null);
_subscription = new Subscription(_channel, queueName, false);
}
BasicDeliverEventArgs args;
bool gotMessage = _subscription.Next(250, out args);
if (gotMessage)
{
if(args == null)
{
//This means the connection is closed.
DisposeAllConnectionObjects();
continue;
}
handler(args.Body);
_subscription.Ack(args);
}
}
catch(OperationInterruptedException ex)
{
DisposeAllConnectionObjects();
}
}
DisposeAllConnectionObjects();
}
private void DisposeAllConnectionObjects()
{
if(_subscription != null)
{
//IDisposable is implemented explicitly for some reason.
((IDisposable)_subscription).Dispose();
_subscription = null;
}
if(_channel != null)
{
_channel.Dispose();
_channel = null;
}
if(_connection != null)
{
try
{
_connection.Dispose();
}
catch(EndOfStreamException)
{
}
_connection = null;
}
}