Как перезвонить асинхронной функции из RX подписаться?
Я хотел бы перезвонить асинхронной функции в рамках подписки Rx.
Например, вот так:
public class Consumer
{
private readonly Service _service = new Service();
public ReplaySubject<string> Results = new ReplaySubject<string>();
public void Trigger()
{
Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
}
public Task RunAsync()
{
return _service.DoAsync();
}
}
public class Service
{
public async Task<string> DoAsync()
{
return await Task.Run(() => Do());
}
private static string Do()
{
Thread.Sleep(TimeSpan.FromMilliseconds(200));
throw new ArgumentException("invalid!");
return "foobar";
}
}
[Test]
public async Task Test()
{
var sut = new Consumer();
sut.Trigger();
var result = await sut.Results.FirstAsync();
}
Что нужно сделать, чтобы правильно перехватить исключение?
4 ответа
Вы не хотите передавать async
метод для Subscribe
, потому что это создаст async void
метод. Делай все возможное, чтобы избежать async void
,
В вашем случае, я думаю, что вы хотите, чтобы позвонить async
метод для каждого элемента последовательности, а затем кэшировать все результаты. В этом случае используйте SelectMany
позвонить async
метод для каждого элемента и Replay
кэшировать (плюс Connect
чтобы мяч катился):
public class Consumer
{
private readonly Service _service = new Service();
public IObservable<string> Trigger()
{
var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(_ => RunAsync())
.Replay();
connectible.Connect();
return connectible;
}
public Task<string> RunAsync()
{
return _service.DoAsync();
}
}
Я изменил Results
собственность, которая будет возвращена из Trigger
Вместо этого метод, который, я думаю, имеет больше смысла, поэтому тест теперь выглядит так:
[Test]
public async Task Test()
{
var sut = new Consumer();
var results = sut.Trigger();
var result = await results.FirstAsync();
}
Ответ Пола Беттса работает в большинстве сценариев, но если вы хотите заблокировать поток в ожидании завершения асинхронной функции, вам нужно что-то вроде этого:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(l => Observable.FromAsync(asyncMethod))
.Concat()
.Subscribe();
Или же:
Observable.Interval(TimeSpan.FromSeconds(1))
.Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
.Concat()
.Subscribe();
Измените это на:
Observable.Timer(TimeSpan.FromMilliseconds(100))
.SelectMany(async _ => await RunAsync())
.Subscribe();
Подписка не сохраняет асинхронную операцию внутри наблюдаемой.
Основываясь на ответе Рейджера, я создал метод расширения.
public static IDisposable SubscribeAsync<TResult>(this IObservable<TResult> source, Func<Task> action) =>
source.Select(_ => Observable.FromAsync(action))
.Concat()
.Subscribe();
Если я правильно понимаю, он должен блокироваться до завершения асинхронной задачи. Но это позволит вам позвонить
SubscribeAsync
и передать в вашей задаче. На мой взгляд, это делает его немного более читабельным.
WhenSomethingHappened
.SubscribeAsync(async () => {
await DoFirstAsyncThing();
await DoSecondAsyncThing();
});