Служебная шина Azure - получение сообщений методом OnMessage()
Следуя документации MS, было нетрудно получить сообщения из подписки. Однако, если я хочу, чтобы моя заявка получала сообщение каждый раз, когда новое сообщение публикуется - постоянный опрос. Следовательно, метод OnMessage() класса SubscriptionClient.
Документация MS гласит: " ... При вызове OnMessage клиент запускает внутренний насос сообщений, который постоянно опрашивает очередь или подписку. Этот насос сообщений состоит из бесконечного цикла, который выполняет вызов Receive(). Если время вызова истекает, он выдает следующий вызов Receive().... "
Но когда приложение работает, в тот момент, когда метод OnMessage() вызывается, только последнее сообщение (я) получено. Когда новые сообщения публикуются, постоянный опрос не работает. После того, как я попробовал много разных подходов, единственный способ заставить эту работу заставить приложение реагировать в момент получения нового сообщения - поместить код в отдельную задачу с бесконечным циклом. Это кажется совершенно неправильным на многих уровнях! (см. код ниже).
Может ли кто-нибудь помочь мне исправить мой код или опубликовать рабочий образец для выполнения той же функциональности без цикла? Спасибо!
public void ReceiveMessageFromSubscription(string topicName, string subscriptionFilter)
{
var newMessage = new MessageQueue();
int i = 0;
Task listener = Task.Factory.StartNew(() =>
{
while (true)
{
SubscriptionClient Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);
Dictionary<string, string> retrievedMessage = new Dictionary<string, string>();
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = false;
options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
Client.OnMessage((message) =>
{
try
{
retrievedMessage.Add("messageGuid", message.Properties["MessageGuid"].ToString());
retrievedMessage.Add("instanceId", message.Properties["InstanceId"].ToString());
retrievedMessage.Add("pId", message.Properties["ProcessId"].ToString());
retrievedMessage.Add("processKey", message.Properties["ProcessKey"].ToString());
retrievedMessage.Add("message", message.Properties["Message"].ToString());
newMessage.AnnounceNewMessage(retrievedMessage); // event ->
message.Complete(); // Remove message from subscription.
}
catch (Exception ex)
{
string exmes = ex.Message;
message.Abandon();
}
}, options);
retrievedMessage.Clear();
i++;
Thread.Sleep(3000);
}
});
}
1 ответ
В вашем коде есть несколько проблем, которые нужно решить -
- Он проваливается, и я предполагаю, что ваше приложение затем завершается - или, по крайней мере, поток, который прослушивает сообщения, завершается.
- Ваш цикл while повторяет код, чтобы подключить обработчик сообщений, вам нужно сделать это только один раз.
- Вам нужен способ сохранить стек вызовов и предотвратить сбор мусора в вашем приложении.
Ниже следует увидеть вас на пути к успеху. Удачи.
ManualResetEvent CompletedResetEvent = new ManualResetEvent(false);
SubscriptionClient Client;
public void ReceiveMessagesFromSubscription(string topicName, string subscriptionFilter, string connectionString)
{
Task listener = Task.Factory.StartNew(() =>
{
// You only need to set up the below once.
Client = SubscriptionClient.CreateFromConnectionString(connectionString, topicName, subscriptionFilter);
OnMessageOptions options = new OnMessageOptions();
options.AutoComplete = false;
options.AutoRenewTimeout = TimeSpan.FromMinutes(1);
options.ExceptionReceived += LogErrors;
Client.OnMessage((message) =>
{
try
{
Trace.WriteLine("Got the message with ID {0}", message.MessageId);
message.Complete(); // Remove message from subscription.
}
catch (Exception ex)
{
Trace.WriteLine("Exception occurred receiving a message: {0}" + ex.ToString());
message.Abandon(); // Failed. Leave the message for retry or max deliveries is exceeded and it dead letters.
}
}, options);
CompletedResetEvent.WaitOne();
});
}
/// <summary>
/// Added in rudimentary exception handling .
/// </summary>
/// <param name="sender">The sender.</param>
/// <param name="ex">The <see cref="ExceptionReceivedEventArgs"/> instance containing the event data.</param>
private void LogErrors(object sender, ExceptionReceivedEventArgs ex)
{
Trace.WriteLine("Exception occurred in OnMessage: {0}" + ex.ToString());
}
/// <summary>
/// Call this to stop the messages arriving from subscription.
/// </summary>
public void StopMessagesFromSubscription()
{
Client.Close(); // Close the message pump down gracefully
CompletedResetEvent.Set(); // Let the execution of the listener complete and terminate gracefully
}
В качестве альтернативы вы можете разделить сообщение более традиционным способом самостоятельно, используя ReceiveBatch:
var messages = await queueClient.ReceiveBatchAsync(10, TimeSpan.FromSeconds(30),
cancellationToken);