Как реализовать опрос с использованием Observables?

У меня есть параметризованный вызов rest, который должен выполняться каждые пять секунд с разными параметрами:

Observable<TResult> restCall = api.method1(param1);

Мне нужно создать Observable<TResult> который будет опрашивать restCall каждые 5 секунд с различными значениями для param1. Если вызов API не удается, мне нужно получить ошибку и сделать следующий вызов через 5 секунд. Интервал между вызовами должен измеряться только после завершения restCall (успех / ошибка).

В настоящее время я использую RxJava, но пример.NET также был бы хорош.

2 ответа

Решение

Вступление

Во-первых, признание, я парень.NET, и я знаю, что этот подход использует некоторые идиомы, которые не имеют прямого эквивалента в Java. Но я верю вам на слово и буду исходить из того, что это отличный вопрос, который понравится ребятам.NET, и, надеюсь, он приведет вас по верному пути в rx-java, на который я никогда не смотрел. Это довольно длинный ответ, но в основном это объяснение - сам код решения довольно короткий!

Использование либо

Сначала нам нужно разобраться с некоторыми инструментами, чтобы помочь с этим решением. Во-первых, это использование Either<TLeft, TRight> тип. Это важно, потому что у вас есть два возможных результата каждого вызова: либо хороший результат, либо ошибка. Но нам нужно обернуть их в один тип - мы не можем использовать OnError для отправки ошибок назад, поскольку это приведет к прекращению потока результатов. Любой из них немного похож на Tuple и позволяет легче справляться с этой ситуацией. Библиотека Rxx имеет очень полную и хорошую реализацию Either, но вот простой общий пример использования, за которым следует простая реализация, подходящая для наших целей:

var goodResult = Either.Right<Exception,int>(1);
var exception = Either.Left<Exception,int>(new Exception());

/* base class for LeftValue and RightValue types */
public abstract class Either<TLeft, TRight>
{
    public abstract bool IsLeft { get; }
    public bool IsRight { get { return !IsLeft; } }
    public abstract TLeft Left { get; }
    public abstract TRight Right { get;  }    
}

public static class Either
{
    public sealed class LeftValue<TLeft, TRight> : Either<TLeft, TRight>
    {
        TLeft _leftValue;

        public LeftValue(TLeft leftValue)
        {
            _leftValue = leftValue;
        }

        public override TLeft Left { get { return _leftValue; } }
        public override TRight Right { get { return default(TRight); } }
        public override bool IsLeft { get { return true; } }
    }

    public sealed class RightValue<TLeft, TRight> : Either<TLeft, TRight>
    {
        TRight _rightValue;

        public RightValue(TRight rightValue)
        {
            _rightValue = rightValue;
        }

        public override TLeft Left { get { return default(TLeft); } }
        public override TRight Right { get { return _rightValue; } }
        public override bool IsLeft { get { return false; } }
    }

    // Factory functions to create left or right-valued Either instances
    public static Either<TLeft, TRight> Left<TLeft, TRight>(TLeft leftValue)
    {
        return new LeftValue<TLeft, TRight>(leftValue);
    }

    public static Either<TLeft, TRight> Right<TLeft, TRight>(TRight rightValue)
    {
        return new RightValue<TLeft, TRight>(rightValue);
    }
}

Обратите внимание, что по соглашению при использовании Either для моделирования успеха или неудачи для успешного значения используется правая сторона, потому что это, конечно, "Right":)

Некоторые вспомогательные функции

Я собираюсь смоделировать два аспекта вашей проблемы с некоторыми вспомогательными функциями. Во-первых, здесь есть фабрика для генерации параметров - каждый раз, когда она вызывается, она возвращает следующее целое число в последовательности целых чисел, начиная с 1:

// An infinite supply of parameters
private static int count = 0;
public int ParameterFactory()
{
    return ++count; 
}

Далее, вот функция, которая имитирует ваш вызов Rest как IObservable. Эта функция принимает целое число и:

  • Если целое число четное, оно возвращает Observable, который немедленно отправляет OnError.
  • Если целое число нечетное, оно возвращает строку, объединяющую целое число с "-ret", но только через секунду. Мы будем использовать это для проверки того, что интервал опроса ведет себя так, как вы просили - как пауза между завершенными вызовами, сколько бы времени они ни занимали, а не регулярный интервал.

Вот:

// A asynchronous function representing the REST call
public IObservable<string> SomeRestCall(int x)
{
    return x % 2 == 0
        ? Observable.Throw<string>(new Exception())
        : Observable.Return(x + "-ret").Delay(TimeSpan.FromSeconds(1));   
}

Теперь хороший бит

Ниже приведена достаточно универсальная функция многократного использования, которую я назвал Poll, Он принимает асинхронную функцию, которая будет опрашиваться, фабрику параметров для этой функции, желаемый интервал отдыха (без каламбура!) И, наконец, IScheduler для использования.

Самый простой подход, который я мог придумать, это использовать Observable.Create который использует планировщик для управления потоком результатов. ScheduleAsync способ планирования, использующий асинхронную / ожидающую форму.NET Это.NET идиома, которая позволяет вам писать асинхронный код в обязательном порядке. async Ключевое слово вводит асинхронную функцию, которая может затем await один или несколько асинхронных вызовов в его теле и будут продолжаться только после завершения вызова. Я написал длинное объяснение этого стиля планирования в этом вопросе, который включает в себя более старый рекурсивный стиль, который может быть легче реализовать в подходе rx-java. Код выглядит так:

public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
    Func<TArg, IObservable<TResult>> asyncFunction,
    Func<TArg> parameterFactory,
    TimeSpan interval,
    IScheduler scheduler)
{
    return Observable.Create<Either<Exception, TResult>>(observer =>
    {
        return scheduler.ScheduleAsync(async (ctrl, ct) => {
            while(!ct.IsCancellationRequested)
            {
                try
                {
                    var result = await asyncFunction(parameterFactory());
                    observer.OnNext(Either.Right<Exception,TResult>(result));
                }
                catch(Exception ex)
                {
                    observer.OnNext(Either.Left<Exception, TResult>(ex));
                }
                await ctrl.Sleep(interval, ct);
            }
        });        
    });    
}

Разбивая это, Observable.Create В общем, это фабрика для создания IObservables, которая дает вам большой контроль над тем, как результаты публикуются для наблюдателей. Это часто упускается из виду в пользу неоправданно сложной композиции примитивов.

В этом случае мы используем его для создания потока Either<TResult, Exception> так что мы можем вернуть успешные и неудачные результаты опроса.

Create Функция принимает наблюдателя, который представляет подписчика, которому мы передаем результаты через OnNext/OnError/OnCompleted. Нам нужно вернуть IDisposable в пределах Create вызов - в.NET это дескриптор, с помощью которого подписчик может отменить свою подписку. Это особенно важно, потому что в противном случае опрос будет продолжаться вечно - или, по крайней мере, никогда OnComplete,

Результат ScheduleAsync (или просто Schedule) такая ручка. При утилизации он отменит любое ожидающее событие, которое мы запланировали, и тем самым завершит цикл опроса. В нашем случае Sleep мы используем для управления интервалом отменяемую операцию, хотя функцию опроса можно легко изменить, чтобы принять отменяемую asyncFunction который принимает CancellationToken также.

Метод ScheduleAsync принимает функцию, которая будет вызываться для планирования событий. Передано два аргумента, первый ctrl это сам планировщик. Второй ct это аннулирование, которое мы можем использовать, чтобы узнать, было ли запрошено аннулирование (подписчиком, использующим свой дескриптор подписки).

Сам опрос выполняется через бесконечный цикл while, который завершается только в том случае, если CancellationToken указывает, что было отменено запрос.

В цикле мы можем использовать магию async/await, чтобы асинхронно вызывать функцию опроса, но по-прежнему заключать ее в обработчик исключений. Это так круто! При условии отсутствия ошибок, мы отправляем результат как правильное значение Either к наблюдателю через OnNext, Если было исключение, мы отправляем это как левое значение Either наблюдателю. Наконец, мы используем Sleep функция в планировщике для планирования пробуждения после интервала отдыха - не путать с Thread.Sleep вызовите, этот обычно не блокирует никакие потоки. Обратите внимание, что сон принимает CancellationToken позволяя также прервать это!

Я думаю, вы согласитесь, что это довольно классное использование async/await, чтобы упростить то, что было бы очень сложной задачей!

Пример использования

Наконец, вот тестовый код, который вызывает Pollвместе с примером вывода - для поклонников LINQPad весь код в этом ответе будет выполняться в LINQPad со ссылками на сборки Rx 2.1:

void Main()
{
    var subscription = Poll(SomeRestCall,
                            ParameterFactory,
                            TimeSpan.FromSeconds(5),
                            ThreadPoolScheduler.Instance)
        .TimeInterval()                            
        .Subscribe(x => {
            Console.Write("Interval: " + x.Interval);
            var result = x.Value;
            if(result.IsRight)
                Console.WriteLine(" Success: " + result.Right);
            else
                Console.WriteLine(" Error: " + result.Left.Message);
        });

    Console.ReadLine();    
    subscription.Dispose();
}

Interval: 00:00:01.0027668 Success: 1-ret
Interval: 00:00:05.0012461 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0009684 Success: 3-ret
Interval: 00:00:05.0003127 Error: Exception of type 'System.Exception' was thrown.
Interval: 00:00:06.0113053 Success: 5-ret
Interval: 00:00:05.0013136 Error: Exception of type 'System.Exception' was thrown.

Обратите внимание, что интервал между результатами составляет либо 5 секунд (интервал опроса), если ошибка была немедленно возвращена, либо 6 секунд (интервал опроса плюс длительность имитированного вызова REST) ​​для успешного результата.

РЕДАКТИРОВАТЬ - Вот альтернативная реализация, которая не использует ScheduleAsync, но использует рекурсивное планирование старого стиля и без синтаксиса async/await. Как видите, все намного сложнее - но он также поддерживает отмену наблюдаемой функции asyncFunction.

    public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
        Func<TArg, IObservable<TResult>> asyncFunction,
        Func<TArg> parameterFactory,
        TimeSpan interval,
        IScheduler scheduler)
    {
        return Observable.Create<Either<Exception, TResult>>(
            observer =>
                {
                    var disposable = new CompositeDisposable();
                    var funcDisposable = new SerialDisposable();
                    bool cancelRequested = false;
                    disposable.Add(Disposable.Create(() => { cancelRequested = true; }));
                    disposable.Add(funcDisposable);
                    disposable.Add(scheduler.Schedule(interval, self =>
                        {
                            funcDisposable.Disposable = asyncFunction(parameterFactory())
                                .Finally(() =>
                                    {
                                        if (!cancelRequested) self(interval);
                                    })
                                .Subscribe(
                                    res => observer.OnNext(Either.Right<Exception, TResult>(res)),
                                    ex => observer.OnNext(Either.Left<Exception, TResult>(ex)));
                        }));

                    return disposable;

                });
    }

Смотрите мой другой ответ для другого подхода, который избегает.NET 4.5 асинхронных / ожидающих функций и не использует расписание вызовов.

Я надеюсь, что это поможет ребятам из RX-Java!

Я очистил подход, который не использует вызов Schedule напрямую - используя тип Either из моего другого ответа - он также будет работать с тем же тестовым кодом и даст те же результаты:

    public IObservable<Either<Exception, TResult>> Poll2<TResult, TArg>(
        Func<TArg, IObservable<TResult>> asyncFunction,
        Func<TArg> parameterFactory,
        TimeSpan interval,
        IScheduler scheduler)
    {
        return Observable.Create<Either<Exception, TResult>>(
            observer =>
                Observable.Defer(() => asyncFunction(parameterFactory()))
                          .Select(Either.Right<Exception, TResult>)
                          .Catch<Either<Exception, TResult>, Exception>(
                            ex => Observable.Return(Either.Left<Exception, TResult>(ex)))
                          .Concat(Observable.Defer(
                            () => Observable.Empty<Either<Exception, TResult>>()
                                            .Delay(interval, scheduler)))
                          .Repeat().Subscribe(observer));
    }

Это имеет правильную семантику отмены.

Замечания по реализации

  • Вся конструкция использует Repeat для получения циклического поведения.
  • Начальная задержка используется, чтобы гарантировать, что на каждой итерации передается другой параметр
  • Функция Select проецирует результат OnNext на Either на правой стороне
  • Catch проецирует результат OnError на Either на левой стороне - обратите внимание, что это исключение завершает текущую наблюдаемую asyncFunction, следовательно, необходимость повторения
  • Concat добавляет интервал задержки

Мое мнение таково, что версия расписания более удобочитаема, но эта версия не использует async /await и поэтому совместима с.NET 4.0.

Другие вопросы по тегам