Отмена длинного потока Rx

Если у меня есть поток с длительной работой, что-то вроде:

inputStream.Select(n => Task.Run(() =>
{
    // Long running operation
    Thread.Sleep(TimeSpan.FromSeconds(5));

    return n * n;
}).ToObservable())
.Switch()
.Subscribe(result => 
{
    // Use result in some way
    Console.WriteLine(result);
});

Как я могу получить аннулирование внутри Task.Run звоните так, чтобы когда Switch удаляет подписку для расчета в полете, он устанавливает CancellationToken как отмененный, поэтому я знаю, чтобы прервать вычисление.

1 ответ

Решение

Вы можете использовать Observable.StartAsync метод, например

inputStream.Select(n => Observable.StartAsync((token => Task.Run(() =>
{
    if (token.IsCancellationRequested)
    {
        // .. don't need to do anything
        return 0;
    }
    else
    {
        Thread.Sleep(TimeSpan.FromSeconds(1));
        return n * n;

    }                   
}))))
.Switch()
.Subscribe(Console.WriteLine);

В качестве альтернативы, если вы будете создавать несколько значений, вы можете использовать Observable.Create перегрузка, которая работает с Task чтобы получить CancellationToken. Например

inputStream.Select(n => Observable.Create<int>((observer, token) => Task.Run(() =>
{
    while (!token.IsCancellationRequested)
    {
        Thread.Sleep(TimeSpan.FromSeconds(1));
        observer.OnNext(n * n);
    }

    observer.OnCompleted();
})))
.Switch()
.Subscribe(Console.WriteLine);

Внутри вашей задачи вам нужно позвонить OnNext производить ценности. Возвращаемое значение задачи, если оно есть, игнорируется.

Другие вопросы по тегам