Наблюдаемый от Цепных Задач

Я пытаюсь создать Observable, где каждый элемент создается с помощью асинхронной задачи. Следующий элемент должен быть произведен посредством асинхронного вызова на результат предыдущего элемента (ко-рекурсия). На языке "Generate" это будет выглядеть примерно так - за исключением того, что Generate не поддерживает асинхронный режим (и при этом он не поддерживает делегат в исходном состоянии).

var ob = Observable.Generate(
   async () => await ProduceFirst(),        // Task<T> ProduceFirst()
   prev => Continue(prev)                   // bool Continue(T);
   async prev => await ProduceNext(prev)    // Task<T> ProduceNext(T)
   item => item
);

В качестве более конкретного примера, чтобы просмотреть все сообщения из очереди ServiceBus, извлекая их по 100 сообщений за раз, реализуйте ProduceFirst, Continue и ProduceNext следующим образом:

Task<IEnumerable<BrokeredMessage>> ProduceFirst() 
{
    const int batchSize = 100;
    return _serviceBusReceiver.PeekBatchAsync(batchSize);
}

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

async Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    const int batchSize = 100;
    return (await _serviceBusReceiver.PeekBatchAsync(prev.Last().SequenceNumber, batchSize + 1)).Skip(1)
}

А потом позвони .SelectMany(i => i) на IObservable<IEnumerable<BrokeredMessage>> превратить это в IObservable<BrokeredMessage>

Где _serviceBusReceiver является экземпляром интерфейса следующим образом:

public interface IServiceBusReceiver
{
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(int batchSize);
    Task<IEnumerable<BrokeredMessage>> PeekBatchAsync(long fromSequenceNumber, int batchSize);
}

И BrokeredMessage от https://msdn.microsoft.com/en-us/library/microsoft.servicebus.messaging.brokeredmessage.aspx

4 ответа

Решение

Если вы собираетесь накатить свой собственный асинхронный Generate Я бы порекомендовал использовать рекурсивное планирование вместо переноса цикла while.

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TResult>(async obs => {
    return s.Schedule(await initialState(), async (state, self) => 
    {
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      obs.OnNext(resultSelector(state));

      self(await iterate(state));

    });
  });
}

Это имеет пару преимуществ. Во-первых, вы можете отменить это, используя простой цикл while, и вы не можете отменить его напрямую, фактически вы даже не возвращаетесь к функции подписки, пока наблюдаемое не завершится. Во-вторых, это позволяет вам контролировать планирование / асинхронность каждого элемента (что делает тестирование быстрым), это также делает его более подходящим для библиотеки.

После тщательного тестирования я думаю, что это хорошо работает с использованием встроенных операторов Rx.

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
    return Observable.Create<TResult>(o =>
    {
        var current = default(TResult);
        return
            Observable
                .FromAsync(initialState)
                .Select(y => resultSelector(y))
                .Do(c => current = c)
                .Select(x =>
                    Observable
                        .While(
                            () => condition(current),
                            Observable
                                .FromAsync(() => iterate(current))
                                .Select(y => resultSelector(y))
                        .Do(c => current = c))
                        .StartWith(x))
                .Switch()
                .Where(x => condition(x))
                .ObserveOn(scheduler ?? Scheduler.Default)
                .Subscribe(o);
    });
}

Я проверил этот код со следующим:

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
    return
        Task.FromResult(
            EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = 1
                }));
}

Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    return Task.FromResult(
        prev.Last().SequenceNumber < 3
            ? EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = prev.Last().SequenceNumber + 1 
                })
            : Enumerable.Empty<BrokeredMessage>());
}

public class BrokeredMessage
{
    public int SequenceNumber;
}

И работает эта последовательность:

var ob = Generate(
    async () => await ProduceFirst(),
    prev => Continue(prev),
    async prev => await ProduceNext(prev),
    item => item);

Я получил этот результат:

результат

В моем тестовом коде также использовались интерактивные расширения команды Reactive Extension - NuGet "Ix-Main".

У меня такой же вопрос, а также я согласен со следующим комментарием:

Возможно, я нарушаю дух реактивной парадигмы, но это то, что мне нужно в данный момент - он не должен продолжать извлекать сообщения из очереди, пока они не будут обработаны (по крайней мере, в ближайшем будущем).

я полагаю, что IAsyncEnumerable из Ix.NET лучше подходит IObservable для этого сценария - как для вопроса здесь, так и для любой подобной функции асинхронного развертывания. Причина в том, что каждый раз, когда мы повторяем, а затем извлекаем результат из Taskуправление потоком находится у нас, вызывающего, чтобы вытащить следующий элемент или отказаться от выполнения определенного условия. Это как IAsyncEnumerable и не как IObservable, что подталкивает нас к нам, не имея контроля над скоростью.

Ix.NET не имеет подходящей версии AsyncEnumerable.Generate поэтому я написал следующее, чтобы решить эту проблему.

   public static IAsyncEnumerable<TState> Generate<TState>(TState initialState, Func<TState, bool> condition, Func<TState, Task<TState>> iterate)
    {
        return AsyncEnumerable.CreateEnumerable(() =>
        {
            var started = false;
            var current = default(TState);
            return AsyncEnumerable.CreateEnumerator(async c =>
            {

                if (!started)
                {
                    started = true;
                    var conditionMet = !c.IsCancellationRequested && condition(initialState);
                    if (conditionMet) current = initialState;
                    return conditionMet;
                }
                {
                    var newVal = await iterate(current).ConfigureAwait(false);
                    var conditionMet = !c.IsCancellationRequested && condition(newVal);
                    if (conditionMet) current = newVal;
                    return conditionMet;
                }

            },
                () => current,
                () => { });
        });



    }

Заметки:

  • Только очень легко проверено.
  • Возвращает ли исходное состояние.
  • Не возвращает первое состояние TS, которое не соответствует условию, даже если он выполнил работу, чтобы получить этот результат. Возможно, другая версия может включать это.
  • Я бы предпочел избавиться от condition параметр, так как, поскольку это система вытягивания, вызывающий объект полностью зависит от того, вызывать MoveNext или нет, и так далее. conditionкажется излишним. Это по существу добавляет вызов TakeWhile на результат функции. Однако я недостаточно глубоко изучил Ix.NET, чтобы узнать, false ответ от MoveNext требуется для того, чтобы dispose IAsyncEnumeratorпо этой причине я включил его.

IAsyncEnumerable конечно может быть преобразован в IObservable если этот конкретный тип требуется.

Вот еще одна реализация, вдохновленная ответом Enigmativity. Он использует новые языковые функции ( деконструкция кортежа C# 7).

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null)
{
    return Observable.Create<TResult>(observer =>
    {
        var (isFirst, current) = (true, default(TResult));
        return Observable
            .While(() => isFirst || condition(current),
                Observable.If(() => isFirst,
                    Observable.FromAsync(ct => initialState()),
                    Observable.FromAsync(ct => iterate(current))
                )
            )
            .Do(x => (isFirst, current) = (false, x))
            .Select(x => resultSelector(x))
            .ObserveOn(scheduler ?? Scheduler.Immediate)
            .Subscribe(observer);
    });
}

Я думаю, что это может быть правильный ответ:

Это не хороший ответ. Не использовать.

Я создал собственный Generate который поддерживает async / await в начальном состоянии + итеративные функции:

    public static IObservable<TResult> Generate<TResult>(
        Func<Task<TResult>> initialState,
        Func<TResult, bool> condition,
        Func<TResult, Task<TResult>> iterate,
        Func<TResult, TResult> resultSelector
        )
    {
        return Observable.Create<TResult>(async obs =>
        {
            var state = await initialState();

            while (condition(state))
            {
                var result = resultSelector(state);
                obs.OnNext(result);
                state = await iterate(state);
            }

            obs.OnCompleted();

            return System.Reactive.Disposables.Disposable.Empty;
        });
    }

К сожалению, это, кажется, имеет побочный эффект, что производство сообщений значительно опережает потребление. Если наблюдатель обрабатывает сообщения медленно, он получит миллионы сообщений, прежде чем мы обработаем их несколько. Не совсем то, что мы хотим от служебного автобуса.

Я собираюсь проработать все вышеизложенное, возможно, прочту еще немного, и при необходимости опубликую более конкретный вопрос.

Другие вопросы по тегам