Как непрерывно прослушивать сообщения Pub/Sub в приложении ASP.NET Core?
Я хотел бы реализовать ASP.NET Core API, который не отвечает на HTTP-запросы, но при запуске начинает прослушивать сообщения Google Cloud Pub/Sub, и он продолжает прослушивать бесконечно на протяжении всей своей жизни.
Каков предпочтительный способ реализовать это с помощью официального Pub/Sub SDK?
Я могу придумать два пути:
Подход 1: просто используйте SimpleSubscriber
и в Startup.Configure
начать слушать сообщения:
public void Configure(IApplicationBuilder app)
{
var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName);
var receivedMessages = new List<PubsubMessage>();
simpleSubscriber.StartAsync((msg, cancellationToken) =>
{
// Process the message here.
return Task.FromResult(SimpleSubscriber.Reply.Ack);
});
...
}
Подход 2. Используйте библиотеку, специально созданную для периодического запуска задания, например, Quartz, Hangfire или FluentScheduler, и каждый раз, когда задание запускается, извлекайте новые сообщения с помощью SubscriberClient
,
Какой из них является предпочтительным подходом? Первый кажется более простым, но я не уверен, действительно ли он надежен.
1 ответ
Первый подход, безусловно, заключается в том, как это предполагается использовать.
Тем не менее, см. Документы для StartAsync
:
Начинает получать сообщения. Возвращенный
Task
завершается, когда либоStopAsync(CancellationToken)
вызывается или если возникает неисправимая ошибка. Этот метод не может быть вызван более одного разаSimpleSubscriber
пример.
Так что вам нужно справиться с неожиданным StartAsync
отключение при неисправимой ошибке. Самое простое, что можно сделать, это использовать внешний цикл, хотя, учитывая, что эти ошибки считаются неисправимыми, вероятно, что-то в вызове необходимо изменить, прежде чем он сможет успешно завершиться.
Код может выглядеть так:
while (true)
{
// Each SimpleSubscriber instance must only be used once.
var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName);
try
{
await simpleSubscriber.StartAsync((msg, cancellationToken) =>
{
// Process the message here.
return Task.FromResult(SimpleSubscriber.Reply.Ack);
});
}
catch (Exception e)
{
// Handle the unrecoverable error somehow...
}
}
Если это не работает должным образом, пожалуйста, сообщите нам об этом.