Отмена длинного потока Rx
Если у меня есть поток с длительной работой, что-то вроде:
inputStream.Select(n => Task.Run(() =>
{
// Long running operation
Thread.Sleep(TimeSpan.FromSeconds(5));
return n * n;
}).ToObservable())
.Switch()
.Subscribe(result =>
{
// Use result in some way
Console.WriteLine(result);
});
Как я могу получить аннулирование внутри Task.Run
звоните так, чтобы когда Switch
удаляет подписку для расчета в полете, он устанавливает CancellationToken как отмененный, поэтому я знаю, чтобы прервать вычисление.
1 ответ
Решение
Вы можете использовать Observable.StartAsync
метод, например
inputStream.Select(n => Observable.StartAsync((token => Task.Run(() =>
{
if (token.IsCancellationRequested)
{
// .. don't need to do anything
return 0;
}
else
{
Thread.Sleep(TimeSpan.FromSeconds(1));
return n * n;
}
}))))
.Switch()
.Subscribe(Console.WriteLine);
В качестве альтернативы, если вы будете создавать несколько значений, вы можете использовать Observable.Create
перегрузка, которая работает с Task
чтобы получить CancellationToken. Например
inputStream.Select(n => Observable.Create<int>((observer, token) => Task.Run(() =>
{
while (!token.IsCancellationRequested)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
observer.OnNext(n * n);
}
observer.OnCompleted();
})))
.Switch()
.Subscribe(Console.WriteLine);
Внутри вашей задачи вам нужно позвонить OnNext
производить ценности. Возвращаемое значение задачи, если оно есть, игнорируется.