Как непрерывно прослушивать сообщения Pub/Sub в приложении ASP.NET Core?

Я хотел бы реализовать ASP.NET Core API, который не отвечает на HTTP-запросы, но при запуске начинает прослушивать сообщения Google Cloud Pub/Sub, и он продолжает прослушивать бесконечно на протяжении всей своей жизни.

Каков предпочтительный способ реализовать это с помощью официального Pub/Sub SDK?

Я могу придумать два пути:

Подход 1: просто используйте SimpleSubscriberи в Startup.Configure начать слушать сообщения:

public void Configure(IApplicationBuilder app)
{
    var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName);
    var receivedMessages = new List<PubsubMessage>();

    simpleSubscriber.StartAsync((msg, cancellationToken) =>
    {
        // Process the message here.

        return Task.FromResult(SimpleSubscriber.Reply.Ack);
    });

    ...
}

Подход 2. Используйте библиотеку, специально созданную для периодического запуска задания, например, Quartz, Hangfire или FluentScheduler, и каждый раз, когда задание запускается, извлекайте новые сообщения с помощью SubscriberClient,

Какой из них является предпочтительным подходом? Первый кажется более простым, но я не уверен, действительно ли он надежен.

1 ответ

Решение

Первый подход, безусловно, заключается в том, как это предполагается использовать.

Тем не менее, см. Документы для StartAsync:

Начинает получать сообщения. Возвращенный Task завершается, когда либо StopAsync(CancellationToken) вызывается или если возникает неисправимая ошибка. Этот метод не может быть вызван более одного раза SimpleSubscriber пример.

Так что вам нужно справиться с неожиданным StartAsync отключение при неисправимой ошибке. Самое простое, что можно сделать, это использовать внешний цикл, хотя, учитывая, что эти ошибки считаются неисправимыми, вероятно, что-то в вызове необходимо изменить, прежде чем он сможет успешно завершиться.

Код может выглядеть так:

while (true)
{
    // Each SimpleSubscriber instance must only be used once.
    var simpleSubscriber = await SimpleSubscriber.CreateAsync(subscriptionName);
    try
    {
        await simpleSubscriber.StartAsync((msg, cancellationToken) =>
        {
            // Process the message here.
            return Task.FromResult(SimpleSubscriber.Reply.Ack);
        });
    }
    catch (Exception e)
    {
        // Handle the unrecoverable error somehow...
    }
}

Если это не работает должным образом, пожалуйста, сообщите нам об этом.

Другие вопросы по тегам