Как внедрить динамический интервал времени в Reactive Extension

У меня есть сценарий, где интервал таймера изменяется при каждом событии тика. Как показано в приведенном ниже коде:

    Timer tmrObj = new Timer();
    tmrObj.Interval = TimeSpan.FromSeconds(11);
    tmrObj.Tick += TimerTickHandler;

   public void TimerTickHandler(EventArg arg)
   {
     tmrObj.pause();

     var response = MakeSomeServiceCall();
     tmr.Interval = response.Interval;

     tmrObj.resume();
   }

Если мне нужно реализовать таймеры в Rx для того же. Я могу добиться с помощью функции таймера. Но как я могу манипулировать Интервалом на тике события, как показано в приведенном выше коде. Текущая реализация интервала таймера выглядит следующим образом:

var serviceCall = Observable.FromAsync<DataResponse>(MakeServiceCall);
var timerCall = Observable.Timer(TimeSpan.FromSeconds(100));

var response = from timer in timerCall
               from reponse in serviceCall.TakeUntil(timerCall)
               .Select(result => result); 

1 ответ

Решение

Ты можешь использовать Generate обрабатывать генерацию данных, если она не асинхронная. Если ваш метод будет использовать асинхронный, хотя вы можете свернуть свой собственный GenerateAsync метод:

public static IObservable<TOut> GenerateAsync<TResult, TOut>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TimeSpan> timeSelector,
    Func<TResult, TOut> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TOut>(async obs => {

    //You have to do your initial time delay here.
    var init = await initialState();
    return s.Schedule(init, timeSelector(init), async (state, recurse) => 
    {
      //Check if we are done
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      //Process the result
      obs.OnNext(resultSelector(state));

      //Initiate the next request
      state = await iterate(state);

      //Recursively schedule again
      recurse(state, timeSelector(state));

    });
  });
}

Смотрите оригинальный ответ

Вы можете использовать это так:

var timeStream = ObservableStatic.GenerateAsync(
  () => MakeServiceCall(),
  _ => true,
  _ => MakeServiceCall(),
  result => result.Interval,
  _ => _);
Другие вопросы по тегам