Отмена обновления асинхронного свойства

Буду признателен за помощь в решении следующей проблемы: есть свойство в моем классе, скажем,

string Foo {get;set;}

В классе есть функция обновления. Внутри есть длительный метод, который обновляет

Foo = await Task.Run()... etc. 

Как я могу избежать суммирования Task-s, когда 1000 Refresh вызывается в секунду? Дребезг? Троттлинг? Как это сделать? Rx доступен в проекте, и я использую ядро ​​dotnet 2.2.

Конструктор классов


    res = Observable.FromAsync(() => Task.Run(async () =>
    {
                       await Task.Delay(5000);
                       return "Example";
    }
    )).Throttle(TimeSpan.FromSeconds(10));

Учебный класс


    private IObservable<string> res;

    public string Foo
    {
                get => _foo;
                set
                {
                    this.RaiseAndSetIfChanged(ref _foo, value);
                }
    }

    public void RefreshFoo()
    {
                res.Subscribe(x => Foo = x);
    }

2 ответа

Решение

Если вы можете использовать другой пакет, я бы предложил ReactiveUI и его ReactiveCommand, который будет обрабатывать все ваши проблемы из коробки:

  var command = ReactiveCommand.CreateFromTask(async () =>
            { // define the work
                Console.WriteLine("Executing at " + DateTime.Now);
                await Task.Delay(1000);
                return "test";
            });

            command.Subscribe(res => {
                // do something with the result: assign to property
            });

            var d = Observable.Interval(TimeSpan.FromMilliseconds(500), RxApp.TaskpoolScheduler) // you can specify scheduler if you want
                .Do(_ => Console.WriteLine("Invoking at " + DateTime.Now))
                .Select(x => Unit.Default) // command argument type is Unit
                .InvokeCommand(command); // this checks command.CanExecute which is false while it is executing

выходы:

Invoking at 2019-01-22 13:34:04
Executing at 2019-01-22 13:34:04
Invoking at 2019-01-22 13:34:05
Invoking at 2019-01-22 13:34:05
Executing at 2019-01-22 13:34:05

Я знаю, что пакет в основном используется для разработки пользовательского интерфейса, но имеет несколько приятных трюков, таких как ReactiveCommand, который вы можете использовать везде.

Остерегайтесь этого await command.Execute() по умолчанию не проверяет, может ли команда быть выполнена.

Я думаю, что это гораздо более читабельно, чем ваше решение.

Наконец я закончил с этим решением.

Работает так:

  • Сразу начинается при вызове
  • Больше вызовов отменено, но в результате они снова запускают обновление после его завершения.
  • LongRunning работает над фоновыми потоками и работает с каждым элементом.
  • Два элемента обновляются одновременно, чтобы избежать голодания в ThreadPool

Пример кода:

private long _refreshCalls;

public async Task RefreshHistoriesAsync()
{
    try
    {
        if (Interlocked.Read(ref _refreshCalls) == 2) //it is running and scheduled to rerun after finished
        {
            return;
        }
        if (Interlocked.Read(ref _refreshCalls) == 1) //it is running but now we will rerun if finished
        {
            Interlocked.Increment(ref _refreshCoinCalls);
            return;
        }
        if (Interlocked.Read(ref _refreshCalls) == 0) //it is not running so we start the work
        {
            Interlocked.Increment(ref _refreshCalls);
        }
        const int simultaneousThread = 2; //threads allowed to run simultaneously in threadpool

        await Task.Run(() => Parallel.ForEach(myListOfItemsToWorkOn, new ParallelOptions { MaxDegreeOfParallelism = simultaneousThread }, item =>
                {
                    LongRunningWork();
                }));

        if (Interlocked.Read(ref _refreshCoinCalls) == 2) //scheduled to rerun so we start the work again
        {
            Interlocked.Exchange(ref _refreshCoinCalls, 0);
            RefreshHistoriesAsync();
        }
        if (Interlocked.Read(ref _refreshCoinCalls) == 1) //done with the job
        {
            Interlocked.Exchange(ref _refreshCoinCalls, 0);
        }
    }
    catch (Exception ex)
    {
        Interlocked.Exchange(ref _refreshCoinCalls, 0);
        //Log error
    }
}
Другие вопросы по тегам