Реактивный внутренний номер фиксированный Интервал между асинхронными вызовами, когда вызов длиннее интервала

Вот мой Interval определение:

m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
                .ObserveOn(m_schedulerProvider.EventLoop)
                .Select(l => Observable.FromAsync(DoWork))
                .Concat()
                .Subscribe();

В приведенном выше коде я кормить IScheduler в обоих Interval & ObserveOn из SchedulerProvider так что я могу ускорить юнит-тест ( TestScheduler.AdvanceBy). Также, DoWork является async метод.

В моем конкретном случае я хочу DoWork функция вызывается каждые 5 секунд. Проблема здесь в том, что я хочу, чтобы 5 секунд были временем между окончанием DoWork и начало другого. Так что если DoWork Выполнение занимает более 5 секунд, скажем, 10 секунд, первый вызов будет через 5 секунд, а второй вызов через 15 секунд.

К сожалению, следующий тест доказывает, что он не ведет себя так:

[Fact]
public void MultiPluginStatusHelperShouldWaitForNextQuery()
{    
    m_queryHelperMock
        .Setup(x => x.CustomQueryAsync())
        .Callback(() => Thread.Sleep(10000))
        .Returns(Task.FromResult(new QueryCompletedEventData()))
        .Verifiable()
    ;

    var multiPluginStatusHelper = m_container.GetInstance<IMultiPluginStatusHelper>();
    multiPluginStatusHelper.MillisecondsInterval = 5000;
    m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
    m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);

    m_queryHelperMock.Verify(x => x.CustomQueryAsync(), Times.Once);
}

DoWork вызывает CustomQueryAsync и тест не проходит, говоря, что он был вызван дважды. Его следует вызывать только один раз из-за задержки, вызванной .Callback(() => Thread.Sleep(1000)),

Что я здесь не так делаю?

Моя фактическая реализация исходит из этого примера.

2 ответа

Решение

Эта проблема возникает часто, обычно при опросе какого-либо ненаблюдаемого источника данных. Когда я сталкиваюсь с этим, я использую RepeatAfterDelay Оператор я написал некоторое время назад:

public static IObservable<T> RepeatAfterDelay<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
    var repeatSignal = Observable
        .Empty<T>()
        .Delay(delay, scheduler);

    // when source finishes, wait for the specified
    // delay, then repeat.
    return source.Concat(repeatSignal).Repeat();
}

И вот как я это использую:

// do first set of work immediately, and then every 5 seconds do it again
m_interval = Observable
    .FromAsync(DoWork)
    .RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler)
    .Subscribe();

// wait 5 seconds, then do first set of work, then again every 5 seconds
m_interval = Observable
    .Timer(TimeSpan.FromSeconds(5), scheduler)
    .SelectMany(_ => Observable
        .FromAsync(DoWork)
        .RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler))
    .Subscribe();

Хотя Brandon красивое и чистое, я обнаружил, что оно блокирует поток, Brandon таймера задержки. Неблокирующая альтернатива может выглядеть примерно так:

public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) =>
    source
        .Concat(
            Observable.Create<T>(async observer =>
            {
                await Task.Delay(delay);
                observer.OnCompleted();
            }))
        .Repeat();

Ваша проблема в том, что ваш код смешивается лениво (Observable) и не ленивые (Задача) конструкции. Пока твой первый Task выполняет Interval снова выстрелит и создаст новое задание в Select оператор. Если вы хотите избежать такого поведения, вам нужно обернуть ваш Observable в Defer блок:

m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
                .ObserveOn(m_schedulerProvider.EventLoop)
                 //I think `Defer` implicitly wraps Tasks, if not wrap it in `FromAsync` Again
                .Select(l => Observable.Defer(() => DoWork()))
                .Concat()
                .Subscribe();

Результатом этого является то, что каждый Observable будет выполнять только отложенный Task когда он подписан, т.е. когда предыдущий завершается.

Примечательно, что это имеет проблему, если ваш производитель производит намного быстрее, чем вы можете потреблять, он начнет накапливаться, и каждая ваша память. В качестве альтернативы я бы предложил использовать это GenerateAsync реализация:

    public static IObservable<TOut> GenerateAsync<TResult, TOut>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TimeSpan> timeSelector,
    Func<TResult, TOut> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TOut>(async obs => {

    //You have to do your initial time delay here.
    var init = await initialState();
    return s.Schedule(init, timeSelector(init), async (state, recurse) => 
    {
      //Check if we are done
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      //Process the result
      obs.OnNext(resultSelector(state));

      //Initiate the next request
      state = await iterate(state);

      //Recursively schedule again
      recurse(state, timeSelector(state));

    });
  });
}

GenerateAsync(DoWork /*Initial state*/, 
              _ => true /*Forever*/, 
              _ => DoWork() /*Do your async task*/,
              _ => TimeSpan.FromSeconds(5) /*Delay between events*/, 
              _ => _ /*Any transformations*/, 
              scheduler)
.Subscribe();

Вышеприведенное устраняет проблему гонок производителя / потребителя, не планируя следующее событие до тех пор, пока не будет выполнено первое.

Другие вопросы по тегам