Разделы и подписки служебной шины Azure с рабочей ролью
Так что я недавно получил необходимость использовать Service Bus Topic and Subscriptions
и я следил за многими статьями и учебниками. Я смог успешно реализовать Microsoft Начало работы с темами служебной шины, а также успешно использовал Visual Studio 2017's
Worker Role
Шаблон для доступа к базе данных.
Тем не менее, я запутался в том, как правильно "объединить" оба. В то время как в статье Начало работы с темами служебной шины показано, как создать 2 приложения: одно для отправки и одно для получения, а затем для выхода, Worker Role
шаблон, кажется, бесконечно зацикливается на await Task.Delay(10000);
,
Я не уверен, как правильно "связать" их. По сути, я хочу, чтобы мой Worker Role
остаться в живых и слушать записи в своей подписке навсегда (или до тех пор, пока она не выйдет явно).
Любое руководство было бы здорово!
PS: я задал соответствующий вопрос, касающийся правильной технологии, которую я должен использовать для моего сценария в StackExchange - Software Engineering, если вы заинтересованы.
Обновление № 1 (2018/08/09)
Основываясь на ответе Арунпрабху, вот код того, как я посылаю Message
на основе статей, которые я прочитал и получаю с помощью Visual Studio 2017
"s Worker Role with Service Bus Queue
шаблон.
Отправка (в соответствии с разделом Начало работы с темами служебной шины)
using System;
using System.Text;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
namespace TopicsSender {
internal static class Program {
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private static ITopicClient _topicClient;
private static void Main(string[] args) {
MainAsync().GetAwaiter().GetResult();
}
private static async Task MainAsync() {
const int numberOfMessages = 10;
_topicClient = new TopicClient(ServiceBusConnectionString, TopicName);
Console.WriteLine("======================================================");
Console.WriteLine("Press ENTER key to exit after sending all the messages.");
Console.WriteLine("======================================================");
// Send messages.
await SendMessagesAsync(numberOfMessages);
Console.ReadKey();
await _topicClient.CloseAsync();
}
private static async Task SendMessagesAsync(int numberOfMessagesToSend) {
try {
for (var i = 0; i < numberOfMessagesToSend; i++) {
// Create a new message to send to the topic
var messageBody = $"Message {i}";
var message = new Message(Encoding.UTF8.GetBytes(messageBody));
// Write the body of the message to the console
Console.WriteLine($"Sending message: {messageBody}");
// Send the message to the topic
await _topicClient.SendAsync(message);
}
} catch (Exception exception) {
Console.WriteLine($"{DateTime.Now} :: Exception: {exception.Message}");
}
}
}
}
Получение (на основе Worker Role with Service Bus Queue
шаблон)
using System;
using System.Diagnostics;
using System.Net;
using System.Threading;
using Microsoft.ServiceBus.Messaging;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1 {
public class WorkerRole : RoleEntryPoint {
// The name of your queue
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private const string SubscriptionName = "test-sub1";
// QueueClient is thread-safe. Recommended that you cache
// rather than recreating it on every request
private SubscriptionClient _client;
private readonly ManualResetEvent _completedEvent = new ManualResetEvent(false);
public override void Run() {
Trace.WriteLine("Starting processing of messages");
// Initiates the message pump and callback is invoked for each message that is received, calling close on the client will stop the pump.
_client.OnMessage((receivedMessage) => {
try {
// Process the message
Trace.WriteLine("Processing Service Bus message: " + receivedMessage.SequenceNumber.ToString());
var message = receivedMessage.GetBody<byte[]>();
Trace.WriteLine($"Received message: SequenceNumber:{receivedMessage.SequenceNumber} Body:{message.ToString()}");
} catch (Exception e) {
// Handle any message processing specific exceptions here
Trace.Write(e.ToString());
}
});
_completedEvent.WaitOne();
}
public override bool OnStart() {
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
// Initialize the connection to Service Bus Queue
_client = SubscriptionClient.CreateFromConnectionString(ServiceBusConnectionString, TopicName, SubscriptionName);
return base.OnStart();
}
public override void OnStop() {
// Close the connection to Service Bus Queue
_client.Close();
_completedEvent.Set();
base.OnStop();
}
}
}
Обновление № 2 (2018/08/10)
После нескольких предложений от Арунпрабху и знания, что я использую разные библиотеки, ниже приведено мое текущее решение с фрагментами, взятыми из нескольких источников. Есть ли что-то, что я пропускаю, добавляю это плечо и т. Д.? В настоящее время получаю сообщение об ошибке, которое может быть связано с другим вопросом или уже получено ответ, поэтому не хочу публиковать его до дальнейшего исследования.
using System;
using System.Diagnostics;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.ServiceBus;
using Microsoft.WindowsAzure.ServiceRuntime;
namespace WorkerRoleWithSBQueue1 {
public class WorkerRole : RoleEntryPoint {
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ManualResetEvent _runCompleteEvent = new ManualResetEvent(false);
// The name of your queue
private const string ServiceBusConnectionString = "<your_connection_string>";
private const string TopicName = "test-topic";
private const string SubscriptionName = "test-sub1";
// _client is thread-safe. Recommended that you cache
// rather than recreating it on every request
private SubscriptionClient _client;
public override void Run() {
Trace.WriteLine("Starting processing of messages");
try {
this.RunAsync(this._cancellationTokenSource.Token).Wait();
} catch (Exception e) {
Trace.WriteLine("Exception");
Trace.WriteLine(e.ToString());
} finally {
Trace.WriteLine("Finally...");
this._runCompleteEvent.Set();
}
}
public override bool OnStart() {
// Set the maximum number of concurrent connections
ServicePointManager.DefaultConnectionLimit = 12;
var result = base.OnStart();
Trace.WriteLine("WorkerRole has been started");
return result;
}
public override void OnStop() {
// Close the connection to Service Bus Queue
this._cancellationTokenSource.Cancel();
this._runCompleteEvent.WaitOne();
base.OnStop();
}
private async Task RunAsync(CancellationToken cancellationToken) {
// Configure the client
RegisterOnMessageHandlerAndReceiveMessages(ServiceBusConnectionString, TopicName, SubscriptionName);
_runCompleteEvent.WaitOne();
Trace.WriteLine("Closing");
await _client.CloseAsync();
}
private void RegisterOnMessageHandlerAndReceiveMessages(string connectionString, string topicName, string subscriptionName) {
_client = new SubscriptionClient(connectionString, topicName, subscriptionName);
var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler) {
// Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
// Set it according to how many messages the application wants to process in parallel.
MaxConcurrentCalls = 1,
// Indicates whether MessagePump should automatically complete the messages after returning from User Callback.
// False below indicates the Complete will be handled by the User Callback as in `ProcessMessagesAsync` below.
AutoComplete = false,
};
_client.RegisterMessageHandler(ProcessMessageAsync, messageHandlerOptions);
}
private async Task ProcessMessageAsync(Message message, CancellationToken token) {
try {
// Process the message
Trace.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
await _client.CompleteAsync(message.SystemProperties.LockToken);
} catch (Exception e) {
// Handle any message processing specific exceptions here
Trace.Write(e.ToString());
await _client.AbandonAsync(message.SystemProperties.LockToken);
}
}
private static Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs) {
Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
Console.WriteLine("Exception context for troubleshooting:");
Console.WriteLine($"- Endpoint: {context.Endpoint}");
Console.WriteLine($"- Entity Path: {context.EntityPath}");
Console.WriteLine($"- Executing Action: {context.Action}");
return Task.CompletedTask;
}
}
}
3 ответа
Учитывая сложность обновленного вопроса Обновление № 1 (2018/08/09), я даю отдельный ответ.
Отправитель и получатель используют разные библиотеки.
Отправитель - https://www.nuget.org/packages/Microsoft.Azure.ServiceBus/
Получатель - https://www.nuget.org/packages/WindowsAzure.ServiceBus/
Microsoft.Azure.ServiceBus имеет объект сообщения как Message, где WindowsAzure.ServiceBus имеет BrokeredMessage.
В Microsoft.Azure.ServiceBus доступен метод RegisterMessageHandler, это альтернатива для client.OnMessage() в WindowsAzure.ServiceBus. Используя это, слушатель получает сообщение как объект сообщения. Эта библиотека поддерживает асинхронное программирование, как вы ожидаете.
Обратитесь сюда за примерами из обеих библиотек.
Если вы используете Visual Studio, существует шаблон по умолчанию, доступный для создания облачной службы Azure и рабочей роли с очередью служебной шины. Там вам нужно изменить QueueClient с SubscriptionClient в WorkerRole.cs.
Затем рабочая роль останется активной, прослушивая сообщения из подписки на тему.
Вы можете найти образцы здесь. Вы должны создать рабочую роль с Service Bus Queue внутри облачной службы.
Мне интересно, есть ли у вас какая-либо конкретная причина для выбора Worker Role вместо Web Job? Если нет, вы можете использовать веб-задание с атрибутом ServiceBusTrigger, что значительно упростит ваш код. подробнее здесь...