Как перезвонить асинхронной функции из RX подписаться?

Я хотел бы перезвонить асинхронной функции в рамках подписки Rx.

Например, вот так:

public class Consumer
{
    private readonly Service _service = new Service();

    public ReplaySubject<string> Results = new ReplaySubject<string>();

    public void Trigger()
    {
        Observable.Timer(TimeSpan.FromMilliseconds(100)).Subscribe(async _ => await RunAsync());
    }

    public Task RunAsync()
    {
        return _service.DoAsync();
    }
}

public class Service
{
    public async Task<string> DoAsync()
    {
        return await Task.Run(() => Do());
    }

    private static string Do()
    {
        Thread.Sleep(TimeSpan.FromMilliseconds(200));
        throw new ArgumentException("invalid!");
        return "foobar";
    }
}

[Test]
public async Task Test()
{
    var sut = new Consumer();
    sut.Trigger();
    var result = await sut.Results.FirstAsync();
}

Что нужно сделать, чтобы правильно перехватить исключение?

4 ответа

Решение

Вы не хотите передавать async метод для Subscribe, потому что это создаст async void метод. Делай все возможное, чтобы избежать async void,

В вашем случае, я думаю, что вы хотите, чтобы позвонить async метод для каждого элемента последовательности, а затем кэшировать все результаты. В этом случае используйте SelectMany позвонить async метод для каждого элемента и Replay кэшировать (плюс Connect чтобы мяч катился):

public class Consumer
{
    private readonly Service _service = new Service();

    public IObservable<string> Trigger()
    {
        var connectible = Observable.Timer(TimeSpan.FromMilliseconds(100))
            .SelectMany(_ => RunAsync())
            .Replay();
        connectible.Connect();
        return connectible;
    }

    public Task<string> RunAsync()
    {
        return _service.DoAsync();
    }
}

Я изменил Results собственность, которая будет возвращена из Trigger Вместо этого метод, который, я думаю, имеет больше смысла, поэтому тест теперь выглядит так:

[Test]
public async Task Test()
{
    var sut = new Consumer();
    var results = sut.Trigger();
    var result = await results.FirstAsync();
}

Ответ Пола Беттса работает в большинстве сценариев, но если вы хотите заблокировать поток в ожидании завершения асинхронной функции, вам нужно что-то вроде этого:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(l => Observable.FromAsync(asyncMethod))
          .Concat()
          .Subscribe();

Или же:

Observable.Interval(TimeSpan.FromSeconds(1))
          .Select(_ => Observable.Defer(() => asyncMethod().ToObservable()))
          .Concat()
          .Subscribe();

Измените это на:

Observable.Timer(TimeSpan.FromMilliseconds(100))
    .SelectMany(async _ => await RunAsync())
    .Subscribe();

Подписка не сохраняет асинхронную операцию внутри наблюдаемой.

Основываясь на ответе Рейджера, я создал метод расширения.

      public static IDisposable SubscribeAsync<TResult>(this IObservable<TResult> source, Func<Task> action) =>
            source.Select(_ => Observable.FromAsync(action))
                .Concat()
                .Subscribe();

Если я правильно понимаю, он должен блокироваться до завершения асинхронной задачи. Но это позволит вам позвонить SubscribeAsyncи передать в вашей задаче. На мой взгляд, это делает его немного более читабельным.

      WhenSomethingHappened
    .SubscribeAsync(async () => { 
        await DoFirstAsyncThing(); 
        await DoSecondAsyncThing(); 
    });

Другие вопросы по тегам