Безопасно ли ожидать Задачу в методе OnNext() Обозревателя?
Я создал пользовательский Observer, который в основном выполняет асинхронную задачу в своем методе OnNext().
Я задаюсь вопросом, хорошая ли идея сделать это, имея в виду, что async void не велика.
public class MessageObserver : IObserver<Message>
{
private IDisposable _unsubscriber;
private readonly IQueueClient _client;
private readonly TelemetryClient _telemetryClient;
public MessageObserver(IQueueClient client, TelemetryClient telemetryClient)
{
_client = client;
_telemetryClient = telemetryClient;
}
public virtual void Subscribe(IObservable<Message> provider)
{
_unsubscriber = provider.Subscribe(this);
}
public virtual void Unsubscribe()
{
_unsubscriber.Dispose();
}
public virtual void OnCompleted()
{
}
public virtual void OnError(Exception error)
{
}
public virtual async void OnNext(Message message)
{
try
{
await _client.SendAsync(message);
}
catch (Exception ex)
{
_telemetryClient.TrackException(ex);
}
}
}
РЕДАКТИРОВАТЬ / Добавить код
У меня есть API, к которому я отправляю ресурс из клиента Angular, и как только ресурс записывается в базу данных, я немедленно отправляю сообщение в служебную шину Azure, а затем возвращаю ранее записанную сущность.
Я не хочу ждать отправки сообщения служебной шины Azure, прежде чем вернуться обратно к клиенту, поэтому я хочу уведомить Rx Observer У меня есть новое сообщение, которое необходимо асинхронно обрабатывать в другом потоке.
Вот моя структура:
// POST: /api/management/campaign
[HttpPost]
public async Task<IActionResult> Create([FromBody] CampaignViewModel model)
{
try
{
if (ModelState.IsValid)
{
var createdCampaign = await _campaignService.CreateCampaign(Mapping.ToCampaign(model));
_upsertServiceBus.SendMessage(new Message(Encoding.UTF8.GetBytes(createdCampaign.CampaignId.ToString())));
return Ok(Mapping.ToCampaignViewModel(createdCampaign));
}
return BadRequest(ModelState);
}
catch (Exception ex)
{
_telemetryClient.TrackException(ex);
return BadRequest(new OpenIdConnectResponse
{
Error = OpenIdConnectConstants.Errors.InvalidRequest,
ErrorDescription = Constants.GenericError
});
}
}
-
public class BusService : IBusService
{
private readonly IObservable<Message> _messageObservable;
private readonly ICollection<Message> _messages = new Collection<Message>();
private readonly IQueueClient _queueClient;
private readonly MessageObserver _messageObserver;
private readonly TelemetryClient _telemetryClient;
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_messageObservable = _messages.ToObservable();
_messageObserver = new MessageObserver(_queueClient, _telemetryClient);
_messageObserver.Subscribe(_messageObservable);
}
public void SendMessage(Message message)
{
_messageObserver.OnNext(message);
}
}
РЕДАКТИРОВАТЬ / Решение с помощью ответа @ Шломо:
public class BusService : IBusService
{
private readonly IQueueClient _queueClient;
private readonly TelemetryClient _telemetryClient;
private readonly Subject<Message> _subject = new Subject<Message>();
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_subject
.ObserveOn(TaskPoolScheduler.Default)
.SelectMany(message =>
{
return Observable.FromAsync(() =>
{
var waitAndRetryPolicy = Policy
.Handle<Exception>()
.WaitAndRetryAsync(3, retryAttempt =>
TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)),
(exception, retryCount, context) =>
{
_telemetryClient.TrackEvent(
$"Sending message to Azure Service Bus failed with exception ${exception.Message}. Retrying...");
}
);
return waitAndRetryPolicy.ExecuteAsync(async ct =>
{
_telemetryClient.TrackEvent("Sending message to Azure Service Bus...");
await _queueClient.SendAsync(message);
}, CancellationToken.None);
});
})
.Subscribe(unit => { _telemetryClient.TrackEvent("Message sent to Azure Service Bus."); },
ex => _telemetryClient.TrackException(ex));
}
public void SendMessage(Message message)
{
_subject.OnNext(message);
}
}
1 ответ
Я не могу воспроизвести или проверить, но, надеюсь, это поможет вам начать.
Это решение заменяет _messages
, _messageObserver
а также _messageObservable
с темой и реактивным запросом. Пара замечаний:
ObserveOn
позволяет перемещать темы, изменяя "планировщики". Я выбралTaskPoolScheduler
который выполнит остальную часть запроса в другой Задаче.- Я бы порекомендовал назвать синхронную версию
_queueClient.SendAsync
поскольку этот пример позволяет Rx обрабатывать потоки. - Это решение использует обработку исключения Rx, которая прекратит наблюдаемое / обработку в случае исключения. Если вы хотите, чтобы он автоматически перезагружался, добавьте
.Catch/.Retry
,
Код:
public class BusService : IBusService
{
private readonly IQueueClient _queueClient;
private readonly TelemetryClient _telemetryClient;
private readonly Subject<Message> _subject = new Subject<Message>();
protected BusService(IConfiguration configuration, string queueName, TelemetryClient telemetryClient)
{
_telemetryClient = telemetryClient;
_queueClient = new QueueClient(configuration["ServiceBusConnectionString"], queueName);
_subject
.ObserveOn(TaskPoolScheduler.Default) // handle on an available task
.Select(msg => _queueClient.Send(msg)) // syncronous, not async, because already on a different task
.Subscribe(result => { /* log normal result */}, ex => _telemetryClient.TrackException(e), () => {});
}
public void SendMessage(Message message)
{
_subject.OnNext(message);
}
}
Как я уже говорил, код использует Subject
, и вы найдете здесь множество вопросов и ответов, не рекомендуя их. Если вы хотите, вы можете заменить предмет на событие и наблюдаемое сидение на этом событии. Субъекты легче демонстрировать, и я бы поспорил хорошо, когда их держат в секрете.