Как отменить наблюдаемую последовательность

У меня очень просто IObservable<int> который действует как генератор импульсов каждые 500 мс:

var pulses = Observable.GenerateWithTime(0, i => true, i => i + 1, i => i,
                                         i => TimeSpan.FromMilliseconds(500))

И у меня есть CancellationTokenSource (это используется для отмены другой работы, которая выполняется одновременно).

Как я могу использовать источник токенов отмены, чтобы отменить мою наблюдаемую последовательность?

5 ответов

Решение

Если вы используете GenerateWithTime (теперь заменен на Generate, передавая перегрузку func-функции), вы можете заменить второй параметр, чтобы определить состояние токена отмены следующим образом:

var pulses = Observable.Generate(0,
    i => !ts.IsCancellationRequested,
    i => i + 1,
    i => i,
    i => TimeSpan.FromMilliseconds(500));

В качестве альтернативы, если ваше событие, которое вызывает установку токена отмены, может быть преобразовано в саму наблюдаемую, вы можете использовать что-то вроде следующего:

pulses.TakeUntil(CancelRequested);

Более подробное объяснение я разместил по адресу http://www.thinqlinq.com/Post.aspx/Title/Cancelling-a-Reactive-Extensions-Observable.

Это старая ветка, но для дальнейшего использования, здесь есть более простой способ сделать это.

Если у вас есть CancellationToken, вы, вероятно, уже работаете с задачами. Итак, просто преобразуйте его в задачу и позвольте фреймворку выполнить связывание:

using System.Reactive.Threading.Tasks;
...
var task = myObservable.ToTask(cancellationToken);

Это создаст внутреннего подписчика, который будет удален при отмене задачи. Это сделает свое дело в большинстве случаев, потому что большинство наблюдаемых производят значения только при наличии подписчиков.

Теперь, если у вас есть фактическая наблюдаемая, которую нужно по какой-то причине устранить (возможно, горячая наблюдаемая, которая больше не важна, если родительская задача отменена), этого можно достичь с помощью продолжения:

disposableObservable.ToTask(cancellationToken).ContinueWith(t => {
    if (t.Status == TaskStatus.Canceled)
        disposableObservable.Dispose();
});

Вот два удобных оператора для отмены наблюдаемых последовательностей. Разница между ними заключается в том, что происходит в случае отмены. TakeUntilвызывает нормальное завершение последовательности ( OnCompleted), в то время как WithCancellationвызывает исключительное завершение ( OnError).

      /// <summary>Returns the elements from the source observable sequence until the
/// CancellationToken is canceled.</summary>
public static IObservable<TSource> TakeUntil<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(observer =>
            cancellationToken.Register(() => observer.OnNext(default))));
}

/// <summary>Ties a CancellationToken to an observable sequence. In case of
/// cancellation propagates an OperationCanceledException to the observer.</summary>
public static IObservable<TSource> WithCancellation<TSource>(
    this IObservable<TSource> source, CancellationToken cancellationToken)
{
    return source
        .TakeUntil(Observable.Create<Unit>(o => cancellationToken.Register(() =>
            o.OnError(new OperationCanceledException(cancellationToken)))));
}

Пример использования:

      var cts = new CancellationTokenSource(TimeSpan.FromSeconds(30));

var pulses = Observable
    .Generate(0, i => true, i => i + 1, i => i, i => TimeSpan.FromMilliseconds(500))
    .WithCancellation(cts.Token);

Примечание. В случае отмены пользовательские операторы, представленные выше, мгновенно отписываются от базовой наблюдаемой. Это нужно учитывать, если наблюдаемые включают побочные эффекты. Помещение TakeUntil(cts.Token)прежде чем оператор, который выполняет побочные эффекты, отложит завершение всего наблюдаемого до тех пор, пока побочные эффекты не будут завершены (изящное завершение ). Размещение его после побочных эффектов сделает отмену мгновенной, в результате чего потенциально любой запущенный код продолжит работу незамеченным в режиме «запустил и забыл».

Вы можете подключить свой IObservable подписка с CancellationTokenSource со следующим фрагментом

var pulses = Observable.GenerateWithTime(0,
    i => true, i => i + 1, i => i,
    i => TimeSpan.FromMilliseconds(500));

// Get your CancellationTokenSource
CancellationTokenSource ts = ...

// Subscribe
ts.Token.Register(pulses.Subscribe(...).Dispose);

Вы получаете IDisposable Экземпляр вернулся с подписки. Вызов Dispose() на что.

Другие вопросы по тегам