Реактивный внутренний номер фиксированный Интервал между асинхронными вызовами, когда вызов длиннее интервала
Вот мой Interval
определение:
m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
.ObserveOn(m_schedulerProvider.EventLoop)
.Select(l => Observable.FromAsync(DoWork))
.Concat()
.Subscribe();
В приведенном выше коде я кормить IScheduler
в обоих Interval
& ObserveOn
из SchedulerProvider
так что я могу ускорить юнит-тест ( TestScheduler.AdvanceBy). Также, DoWork
является async
метод.
В моем конкретном случае я хочу DoWork
функция вызывается каждые 5 секунд. Проблема здесь в том, что я хочу, чтобы 5 секунд были временем между окончанием DoWork
и начало другого. Так что если DoWork
Выполнение занимает более 5 секунд, скажем, 10 секунд, первый вызов будет через 5 секунд, а второй вызов через 15 секунд.
К сожалению, следующий тест доказывает, что он не ведет себя так:
[Fact]
public void MultiPluginStatusHelperShouldWaitForNextQuery()
{
m_queryHelperMock
.Setup(x => x.CustomQueryAsync())
.Callback(() => Thread.Sleep(10000))
.Returns(Task.FromResult(new QueryCompletedEventData()))
.Verifiable()
;
var multiPluginStatusHelper = m_container.GetInstance<IMultiPluginStatusHelper>();
multiPluginStatusHelper.MillisecondsInterval = 5000;
m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
m_testSchedulerProvider.EventLoopScheduler.AdvanceBy(TimeSpan.FromMilliseconds(5000).Ticks);
m_queryHelperMock.Verify(x => x.CustomQueryAsync(), Times.Once);
}
DoWork
вызывает CustomQueryAsync
и тест не проходит, говоря, что он был вызван дважды. Его следует вызывать только один раз из-за задержки, вызванной .Callback(() => Thread.Sleep(1000))
,
Что я здесь не так делаю?
Моя фактическая реализация исходит из этого примера.
2 ответа
Эта проблема возникает часто, обычно при опросе какого-либо ненаблюдаемого источника данных. Когда я сталкиваюсь с этим, я использую RepeatAfterDelay
Оператор я написал некоторое время назад:
public static IObservable<T> RepeatAfterDelay<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
var repeatSignal = Observable
.Empty<T>()
.Delay(delay, scheduler);
// when source finishes, wait for the specified
// delay, then repeat.
return source.Concat(repeatSignal).Repeat();
}
И вот как я это использую:
// do first set of work immediately, and then every 5 seconds do it again
m_interval = Observable
.FromAsync(DoWork)
.RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler)
.Subscribe();
// wait 5 seconds, then do first set of work, then again every 5 seconds
m_interval = Observable
.Timer(TimeSpan.FromSeconds(5), scheduler)
.SelectMany(_ => Observable
.FromAsync(DoWork)
.RepeatAfterDelay(TimeSpan.FromSeconds(5), scheduler))
.Subscribe();
Хотя Brandon красивое и чистое, я обнаружил, что оно блокирует поток, Brandon таймера задержки. Неблокирующая альтернатива может выглядеть примерно так:
public static IObservable<T> DelayRepeat<T>(this IObservable<T> source, TimeSpan delay) =>
source
.Concat(
Observable.Create<T>(async observer =>
{
await Task.Delay(delay);
observer.OnCompleted();
}))
.Repeat();
Ваша проблема в том, что ваш код смешивается лениво (Observable
) и не ленивые (Задача) конструкции. Пока твой первый Task
выполняет Interval
снова выстрелит и создаст новое задание в Select
оператор. Если вы хотите избежать такого поведения, вам нужно обернуть ваш Observable в Defer
блок:
m_interval = Observable.Interval(TimeSpan.FromSeconds(5), m_schedulerProvider.EventLoop)
.ObserveOn(m_schedulerProvider.EventLoop)
//I think `Defer` implicitly wraps Tasks, if not wrap it in `FromAsync` Again
.Select(l => Observable.Defer(() => DoWork()))
.Concat()
.Subscribe();
Результатом этого является то, что каждый Observable
будет выполнять только отложенный Task
когда он подписан, т.е. когда предыдущий завершается.
Примечательно, что это имеет проблему, если ваш производитель производит намного быстрее, чем вы можете потреблять, он начнет накапливаться, и каждая ваша память. В качестве альтернативы я бы предложил использовать это GenerateAsync
реализация:
public static IObservable<TOut> GenerateAsync<TResult, TOut>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TimeSpan> timeSelector,
Func<TResult, TOut> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TOut>(async obs => {
//You have to do your initial time delay here.
var init = await initialState();
return s.Schedule(init, timeSelector(init), async (state, recurse) =>
{
//Check if we are done
if (!condition(state))
{
obs.OnCompleted();
return;
}
//Process the result
obs.OnNext(resultSelector(state));
//Initiate the next request
state = await iterate(state);
//Recursively schedule again
recurse(state, timeSelector(state));
});
});
}
GenerateAsync(DoWork /*Initial state*/,
_ => true /*Forever*/,
_ => DoWork() /*Do your async task*/,
_ => TimeSpan.FromSeconds(5) /*Delay between events*/,
_ => _ /*Any transformations*/,
scheduler)
.Subscribe();
Вышеприведенное устраняет проблему гонок производителя / потребителя, не планируя следующее событие до тех пор, пока не будет выполнено первое.