Как я могу предотвратить синхронные продолжения задачи?

У меня есть некоторый код библиотеки (сокет сети), который обеспечивает TaskAPI для ожидающих ответов на запросы, основанный на TaskCompletionSource<T>, Однако в TPL раздражает то, что невозможно предотвратить синхронные продолжения. То, что я хотел бы сделать, это либо:

  • рассказать TaskCompletionSource<T> то есть не должно позволять звонящим присоединяться к TaskContinuationOptions.ExecuteSynchronously, или же
  • установить результат (SetResult / TrySetResult) таким образом, который указывает, что TaskContinuationOptions.ExecuteSynchronously следует игнорировать, используя вместо этого пул

В частности, проблема, с которой я столкнулся, заключается в том, что входящие данные обрабатываются специальным читателем, и если вызывающий абонент может TaskContinuationOptions.ExecuteSynchronously они могут остановить читателя (что влияет не только на них). Ранее я работал над этим с помощью некоторого хакера, который обнаруживает наличие каких-либо продолжений, и, если они есть, выдвигает завершение на ThreadPoolоднако это оказывает существенное влияние, если вызывающий объект насыщен своей рабочей очередью, поскольку завершение не будет обработано своевременно. Если они используют Task.Wait() (или аналогичные), то они, по сути, зашли в тупик сами. Точно так же, именно поэтому читатель работает на выделенной ветке, а не на рабочих.

Так; прежде чем я попытаюсь повязать команду TPL: я пропускаю опцию?

Ключевые моменты:

  • Я не хочу, чтобы внешние абоненты могли украсть мою ветку
  • Я не могу использовать ThreadPool как реализация, так как она должна работать, когда пул насыщен

Пример ниже производит вывод (порядок может варьироваться в зависимости от времени):

Continuation on: Main thread
Press [return]
Continuation on: Thread pool

Проблема в том, что случайному звонящему удалось получить продолжение в "Главном потоке". В реальном коде это будет прерывать основной читатель; плохие вещи!

Код:

using System;
using System.Threading;
using System.Threading.Tasks;

static class Program
{
    static void Identify()
    {
        var thread = Thread.CurrentThread;
        string name = thread.IsThreadPoolThread
            ? "Thread pool" : thread.Name;
        if (string.IsNullOrEmpty(name))
            name = "#" + thread.ManagedThreadId;
        Console.WriteLine("Continuation on: " + name);
    }
    static void Main()
    {
        Thread.CurrentThread.Name = "Main thread";
        var source = new TaskCompletionSource<int>();
        var task = source.Task;
        task.ContinueWith(delegate {
            Identify();
        });
        task.ContinueWith(delegate {
            Identify();
        }, TaskContinuationOptions.ExecuteSynchronously);
        source.TrySetResult(123);
        Console.WriteLine("Press [return]");
        Console.ReadLine();
    }
}

6 ответов

Решение

Новое в.NET 4.6:

.NET 4.6 содержит новый TaskCreationOptions: RunContinuationsAsynchronously,


Поскольку вы готовы использовать Reflection для доступа к приватным полям...

Вы можете отметить задачу TCS с помощью TASK_STATE_THREAD_WAS_ABORTED флаг, который заставил бы все продолжения не быть встроенными.

const int TASK_STATE_THREAD_WAS_ABORTED = 134217728;

var stateField = typeof(Task).GetField("m_stateFlags", BindingFlags.NonPublic | BindingFlags.Instance);
stateField.SetValue(task, (int) stateField.GetValue(task) | TASK_STATE_THREAD_WAS_ABORTED);

Редактировать:

Вместо использования Reflection Emit, я предлагаю вам использовать выражения. Это намного удобнее для чтения и имеет преимущество в том, что оно совместимо с PCL:

var taskParameter = Expression.Parameter(typeof (Task));
const string stateFlagsFieldName = "m_stateFlags";
var setter =
    Expression.Lambda<Action<Task>>(
        Expression.Assign(Expression.Field(taskParameter, stateFlagsFieldName),
            Expression.Or(Expression.Field(taskParameter, stateFlagsFieldName),
                Expression.Constant(TASK_STATE_THREAD_WAS_ABORTED))), taskParameter).Compile();

Без использования Reflection:

Если кому-то интересно, я нашел способ сделать это без Reflection, но он тоже немного "грязный" и, конечно, несет немалый штраф за перфекцию:

try
{
    Thread.CurrentThread.Abort();
}
catch (ThreadAbortException)
{
    source.TrySetResult(123);
    Thread.ResetAbort();
}

Я не думаю, что в TPL есть что-то, что обеспечивало бы явный контроль API над TaskCompletionSource.SetResult продолжения. Я решил оставить свой первоначальный ответ для контроля этого поведения для async/await сценарии.

Вот еще одно решение, которое навязывает асинхронный ContinueWith если tcs.SetResult Продолжение происходит в том же потоке SetResult был вызван на:

public static class TaskExt
{
    static readonly ConcurrentDictionary<Task, Thread> s_tcsTasks =
        new ConcurrentDictionary<Task, Thread>();

    // SetResultAsync
    static public void SetResultAsync<TResult>(
        this TaskCompletionSource<TResult> @this,
        TResult result)
    {
        s_tcsTasks.TryAdd(@this.Task, Thread.CurrentThread);
        try
        {
            @this.SetResult(result);
        }
        finally
        {
            Thread thread;
            s_tcsTasks.TryRemove(@this.Task, out thread);
        }
    }

    // ContinueWithAsync, TODO: more overrides
    static public Task ContinueWithAsync<TResult>(
        this Task<TResult> @this,
        Action<Task<TResult>> action,
        TaskContinuationOptions continuationOptions = TaskContinuationOptions.None)
    {
        return @this.ContinueWith((Func<Task<TResult>, Task>)(t =>
        {
            Thread thread = null;
            s_tcsTasks.TryGetValue(t, out thread);
            if (Thread.CurrentThread == thread)
            {
                // same thread which called SetResultAsync, avoid potential deadlocks

                // using thread pool
                return Task.Run(() => action(t));

                // not using thread pool (TaskCreationOptions.LongRunning creates a normal thread)
                // return Task.Factory.StartNew(() => action(t), TaskCreationOptions.LongRunning);
            }
            else
            {
                // continue on the same thread
                var task = new Task(() => action(t));
                task.RunSynchronously();
                return Task.FromResult(task);
            }
        }), continuationOptions).Unwrap();
    }
}

Обновлено с учетом комментария:

Я не контролирую звонящих - я не могу заставить их использовать конкретный вариант продолжения: если бы я мог, проблема не существовала бы в первую очередь

Я не знал, что вы не контролируете звонящего. Тем не менее, если вы не контролируете это, вы, вероятно, не передаете TaskCompletionSource возражать напрямую к вызывающей стороне. По логике вещей, вы бы передавали часть токена, т.е. tcs.Task, В этом случае решение может быть еще проще, добавив еще один метод расширения к приведенному выше:

// ImposeAsync, TODO: more overrides
static public Task<TResult> ImposeAsync<TResult>(this Task<TResult> @this)
{
    return @this.ContinueWith(new Func<Task<TResult>, Task<TResult>>(antecedent =>
    {
        Thread thread = null;
        s_tcsTasks.TryGetValue(antecedent, out thread);
        if (Thread.CurrentThread == thread)
        {
            // continue on a pool thread
            return antecedent.ContinueWith(t => t, 
                TaskContinuationOptions.None).Unwrap();
        }
        else
        {
            return antecedent;
        }
    }), TaskContinuationOptions.ExecuteSynchronously).Unwrap();
}

Использование:

// library code
var source = new TaskCompletionSource<int>();
var task = source.Task.ImposeAsync();
// ... 

// client code
task.ContinueWith(delegate
{
    Identify();
}, TaskContinuationOptions.ExecuteSynchronously);

// ...
// library code
source.SetResultAsync(123);

Это на самом деле работает для обоих await а также ContinueWith ( скрипка) и без отражения взломов.

Что делать вместо того, чтобы делать

var task = source.Task;

вы делаете это вместо

var task = source.Task.ContinueWith<Int32>( x => x.Result );

Таким образом, вы всегда добавляете одно продолжение, которое будет выполняться асинхронно, и тогда не имеет значения, хотят ли подписчики продолжения в том же контексте. Это своего рода карри, не так ли?

Подход с имитацией прерывания выглядел действительно хорошо, но в некоторых случаях приводил к перехвату потоков TPL.

Затем у меня была реализация, которая была похожа на проверку объекта продолжения, но я просто проверял наличие любого продолжения, поскольку на самом деле слишком много сценариев, чтобы данный код работал нормально, но это означало, что даже такие вещи, как Task.Wait привел к поиску пула потоков.

В конечном счете, после проверки большого и большого количества IL, единственным безопасным и полезным сценарием является SetOnInvokeMres сценарий (продолжение вручную-событие-тонкое продолжение). Есть много других сценариев:

  • некоторые не безопасны, и приводят к угону потока
  • остальные бесполезны, так как в конечном итоге приводят к пулу потоков

В итоге я решил проверить ненулевой объект-продолжение; если это ноль, хорошо (без продолжений); если это не нуль, проверка специального случая для SetOnInvokeMres - если это так: хорошо (безопасно вызывать); в противном случае позвольте пулу потоков выполнить TrySetComplete без указания задачи сделать что-то особенное, например, отменить фальсификацию. Task.Wait использует SetOnInvokeMres подход, который является конкретным сценарием, который мы хотим изо всех сил стараться не тупить.

Type taskType = typeof(Task);
FieldInfo continuationField = taskType.GetField("m_continuationObject", BindingFlags.Instance | BindingFlags.NonPublic);
Type safeScenario = taskType.GetNestedType("SetOnInvokeMres", BindingFlags.NonPublic);
if (continuationField != null && continuationField.FieldType == typeof(object) && safeScenario != null)
{
    var method = new DynamicMethod("IsSyncSafe", typeof(bool), new[] { typeof(Task) }, typeof(Task), true);
    var il = method.GetILGenerator();
    var hasContinuation = il.DefineLabel();
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    Label nonNull = il.DefineLabel(), goodReturn = il.DefineLabel();
    // check if null
    il.Emit(OpCodes.Brtrue_S, nonNull);
    il.MarkLabel(goodReturn);
    il.Emit(OpCodes.Ldc_I4_1);
    il.Emit(OpCodes.Ret);

    // check if is a SetOnInvokeMres - if so, we're OK
    il.MarkLabel(nonNull);
    il.Emit(OpCodes.Ldarg_0);
    il.Emit(OpCodes.Ldfld, continuationField);
    il.Emit(OpCodes.Isinst, safeScenario);
    il.Emit(OpCodes.Brtrue_S, goodReturn);

    il.Emit(OpCodes.Ldc_I4_0);
    il.Emit(OpCodes.Ret);

    IsSyncSafe = (Func<Task, bool>)method.CreateDelegate(typeof(Func<Task, bool>));

Обновил, выложил отдельный ответ, чтобы разобраться с ContinueWith в отличие от await (так как ContinueWith не заботится о текущем контексте синхронизации).

Вы можете использовать тупой контекст синхронизации, чтобы наложить асинхронность на продолжение, вызванное вызовом SetResult/SetCancelled/SetException на TaskCompletionSource, Я считаю, что текущий контекст синхронизации (в точке await tcs.Task) является критерием, который TPL использует для принятия решения, делать ли такое продолжение синхронным или асинхронным.

Следующие работы для меня:

if (notifyAsync)
{
    tcs.SetResultAsync(null);
}
else
{
    tcs.SetResult(null);
}

SetResultAsync реализован так:

public static class TaskExt
{
    static public void SetResultAsync<T>(this TaskCompletionSource<T> tcs, T result)
    {
        FakeSynchronizationContext.Execute(() => tcs.SetResult(result));
    }

    // FakeSynchronizationContext
    class FakeSynchronizationContext : SynchronizationContext
    {
        private static readonly ThreadLocal<FakeSynchronizationContext> s_context =
            new ThreadLocal<FakeSynchronizationContext>(() => new FakeSynchronizationContext());

        private FakeSynchronizationContext() { }

        public static FakeSynchronizationContext Instance { get { return s_context.Value; } }

        public static void Execute(Action action)
        {
            var savedContext = SynchronizationContext.Current;
            SynchronizationContext.SetSynchronizationContext(FakeSynchronizationContext.Instance);
            try
            {
                action();
            }
            finally
            {
                SynchronizationContext.SetSynchronizationContext(savedContext);
            }
        }

        // SynchronizationContext methods

        public override SynchronizationContext CreateCopy()
        {
            return this;
        }

        public override void OperationStarted()
        {
            throw new NotImplementedException("OperationStarted");
        }

        public override void OperationCompleted()
        {
            throw new NotImplementedException("OperationCompleted");
        }

        public override void Post(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Post");
        }

        public override void Send(SendOrPostCallback d, object state)
        {
            throw new NotImplementedException("Send");
        }
    }
}

SynchronizationContext.SetSynchronizationContext это очень дешево с точки зрения накладных расходов, которые он добавляет. На самом деле, очень похожий подход используется при реализации WPF.Dispatcher.BeginInvoke,

TPL сравнивает целевой контекст синхронизации в точке await к точке tcs.SetResult, Если контекст синхронизации одинаков (или отсутствует контекст синхронизации в обоих местах), продолжение вызывается напрямую, синхронно. В противном случае он ставится в очередь с помощью SynchronizationContext.Post на целевой контекст синхронизации, т. е. нормальный await поведение. То, что делает этот подход, всегда навязывает SynchronizationContext.Post поведение (или продолжение потока пула, если нет целевого контекста синхронизации).

Обновлено, это не будет работать для task.ContinueWith, так как ContinueWith не заботится о текущем контексте синхронизации. Однако это работает для await task ( скрипка) Это также работает для await task.ConfigureAwait(false),

ОТО, этот подход работает для ContinueWith,

Если вы можете и готовы использовать рефлексию, это должно быть сделано;

public static class MakeItAsync
{
    static public void TrySetAsync<T>(this TaskCompletionSource<T> source, T result)
    {
        var continuation = typeof(Task).GetField("m_continuationObject", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
        var continuations = (List<object>)continuation.GetValue(source.Task);

        foreach (object c in continuations)
        {
            var option = c.GetType().GetField("m_options", BindingFlags.NonPublic | BindingFlags.GetField | BindingFlags.Instance);
            var options = (TaskContinuationOptions)option.GetValue(c);

            options &= ~TaskContinuationOptions.ExecuteSynchronously;
            option.SetValue(c, options);
        }

        source.TrySetResult(result);
    }        
}
Другие вопросы по тегам