Безопасно ли ожидать Задачу в методе OnNext() Обозревателя?

Я создал пользовательский Observer, который в основном выполняет асинхронную задачу в своем методе OnNext().

Я задаюсь вопросом, хорошая ли идея сделать это, имея в виду, что async void не велика.

public class MessageObserver : IObserver<Message>
{
    private IDisposable _unsubscriber;
    private readonly IQueueClient _client;
    private readonly TelemetryClient _telemetryClient;

    public MessageObserver(IQueueClient client, TelemetryClient telemetryClient)
    {
        _client = client;
        _telemetryClient = telemetryClient;
    }

    public virtual void Subscribe(IObservable<Message> provider)
    {
        _unsubscriber = provider.Subscribe(this);
    }

    public virtual void Unsubscribe()
    {
        _unsubscriber.Dispose();
    }

    public virtual void OnCompleted()
    {
    }

    public virtual void OnError(Exception error)
    {
    }

    public virtual async void OnNext(Message message)
    {
        try
        {
            await _client.SendAsync(message);
        }
        catch (Exception ex)
        {
            _telemetryClient.TrackException(ex);
        }
    }
}

РЕДАКТИРОВАТЬ / Добавить код

У меня есть API, к которому я отправляю ресурс из клиента Angular, и как только ресурс записывается в базу данных, я немедленно отправляю сообщение в служебную шину Azure, а затем возвращаю ранее записанную сущность.

Я не хочу ждать отправки сообщения служебной шины Azure, прежде чем вернуться обратно к клиенту, поэтому я хочу уведомить Rx Observer У меня есть новое сообщение, которое необходимо асинхронно обрабатывать в другом потоке.

Вот моя структура:

    // POST: /api/management/campaign
    [HttpPost]
    public async Task<IActionResult> Create([FromBody] CampaignViewModel model)
    {
        try
        {
            if (ModelState.IsValid)
            {
                var createdCampaign = await _campaignService.CreateCampaign(Mapping.ToCampaign(model));
                _upsertServiceBus.SendMessage(new Message(Encoding.UTF8.GetBytes(createdCampaign.CampaignId.ToString())));
                return Ok(Mapping.ToCampaignViewModel(createdCampaign));
            }

            return BadRequest(ModelState);
        }
        catch (Exception ex)
        {
            _telemetryClient.TrackException(ex);
            return BadRequest(new OpenIdConnectResponse
            {
                Error = OpenIdConnectConstants.Errors.InvalidRequest,
                ErrorDescription = Constants.GenericError
            });
        }
    }

-

    public class BusService : IBusService
    {
        private readonly IObservable<Message> _messageObservable;
        private readonly ICollection<Message> _messages = new Collection<Message>();
        private readonly IQueueClient _queueClient;
        private readonly MessageObserver _messageObserver;
        private readonly TelemetryClient _telemetryClient;

        protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
        {
            _telemetryClient = telemetryClient;
            _queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
            _messageObservable = _messages.ToObservable();
            _messageObserver = new MessageObserver(_queueClient, _telemetryClient);
            _messageObserver.Subscribe(_messageObservable);
        }

        public void SendMessage(Message message)
        {
            _messageObserver.OnNext(message);
        }
    }

РЕДАКТИРОВАТЬ / Решение с помощью ответа @ Шломо:

public class BusService : IBusService
{
    private readonly IQueueClient _queueClient;
    private readonly TelemetryClient _telemetryClient;
    private readonly Subject<Message> _subject = new Subject<Message>();

    protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
    {
        _telemetryClient = telemetryClient;
        _queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
        _subject
            .ObserveOn(TaskPoolScheduler.Default)
            .SelectMany(message =>
            {
                return Observable.FromAsync(() =>
                {
                    var waitAndRetryPolicy = Policy
                        .Handle<Exception>()
                        .WaitAndRetryAsync(3, retryAttempt =>
                                TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
                            (exception, retryCount, context) =>
                            {
                                _telemetryClient.TrackEvent(
                                    $"Sending message to Azure Service Bus failed with exception ${exception.Message}. Retrying...");
                            }
                        );

                    return waitAndRetryPolicy.ExecuteAsync(async ct =>
                    {
                        _telemetryClient.TrackEvent("Sending message to Azure Service Bus...");
                        await _queueClient.SendAsync(message);
                    }, CancellationToken.None);
                });
            })
            .Subscribe(unit => { _telemetryClient.TrackEvent("Message sent to Azure Service Bus."); },
                ex => _telemetryClient.TrackException(ex));
    }

    public void SendMessage(Message message)
    {
        _subject.OnNext(message);
    }
}

1 ответ

Решение

Я не могу воспроизвести или проверить, но, надеюсь, это поможет вам начать.

Это решение заменяет _messages, _messageObserver а также _messageObservable с темой и реактивным запросом. Пара замечаний:

  • ObserveOn позволяет перемещать темы, изменяя "планировщики". Я выбрал TaskPoolScheduler который выполнит остальную часть запроса в другой Задаче.
  • Я бы порекомендовал назвать синхронную версию _queueClient.SendAsync поскольку этот пример позволяет Rx обрабатывать потоки.
  • Это решение использует обработку исключения Rx, которая прекратит наблюдаемое / обработку в случае исключения. Если вы хотите, чтобы он автоматически перезагружался, добавьте .Catch/.Retry,

Код:

public class BusService : IBusService
{
    private readonly IQueueClient _queueClient;
    private readonly TelemetryClient _telemetryClient;
    private readonly Subject<Message> _subject = new Subject<Message>();

    protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
    {
        _telemetryClient = telemetryClient;
        _queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
        _subject
            .ObserveOn(TaskPoolScheduler.Default)  // handle on an available task
            .Select(msg => _queueClient.Send(msg)) // syncronous, not async, because already on a different task
            .Subscribe(result => { /* log normal result */}, ex => _telemetryClient.TrackException(e), () => {});
    }

    public void SendMessage(Message message)
    {
        _subject.OnNext(message);
    }
}

Как я уже говорил, код использует Subject, и вы найдете здесь множество вопросов и ответов, не рекомендуя их. Если вы хотите, вы можете заменить предмет на событие и наблюдаемое сидение на этом событии. Субъекты легче демонстрировать, и я бы поспорил хорошо, когда их держат в секрете.

Другие вопросы по тегам