Вызов TaskCompletionSource.SetResult неблокирующим образом.

Я обнаружил, что TaskCompletionSource.SetResult(); вызывает код, ожидающий задачу перед возвратом. В моем случае это приводит к тупику.

Это упрощенная версия, которая запускается в обычном Thread

void ReceiverRun()
    while (true)
    {
        var msg = ReadNextMessage();
        TaskCompletionSource<Response> task = requests[msg.RequestID];

        if(msg.Error == null)
            task.SetResult(msg);
        else
            task.SetException(new Exception(msg.Error));
    }
}

"Асинхронная" часть кода выглядит примерно так.

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

Ожидание фактически вложено в не асинхронные вызовы.

SendAwaitResponse(упрощенный)

public static Task<Response> SendAwaitResponse(string msg)
{
    var t = new TaskCompletionSource<Response>();
    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.Task;
}

Я предполагал, что второй SendAwaitResponse будет выполняться в потоке ThreadPool, но он продолжается в потоке, созданном для ReceiverRun.

Есть ли способ установить результат задачи без продолжения ожидаемого кода?

Приложение представляет собой консольное приложение.

3 ответа

Решение

Я обнаружил, что TaskCompletionSource.SetResult(); вызывает код, ожидающий задачу перед возвратом. В моем случае это приводит к тупику.

Да, у меня есть запись в блоге, документирующая это (AFAIK это не задокументировано в MSDN). Тупик возникает из-за двух вещей:

  1. Там смесь async и блокирующий код (т.е. async метод вызывает Wait).
  2. Продолжение задачи запланировано с использованием TaskContinuationOptions.ExecuteSynchronously,

Я рекомендую начать с самого простого возможного решения: удалить первое (1). Т.е. не смешивай async а также Wait звонки:

await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();

Вместо этого используйте await последовательно:

await SendAwaitResponse("first message");
await SendAwaitResponse("second message");

Если вам нужно, вы можете Wait в альтернативной точке дальше вверх по стеку вызовов (не в async метод).

Это мое самое рекомендуемое решение. Однако, если вы хотите попробовать удалить вторую вещь (2), вы можете сделать пару трюков: либо обернуть SetResult в Task.Run заставить его в отдельный поток (моя библиотека AsyncEx имеет *WithBackgroundContinuations методы расширения, которые делают именно это), или дают вашему потоку фактический контекст (такой как мой AsyncContext введите) и укажите ConfigureAwait(false), что заставит продолжение игнорировать ExecuteSynchronously флаг.

Но эти решения гораздо сложнее, чем просто разделение async и код блокировки.

Как примечание, взгляните на поток данных TPL; Похоже, вы можете найти это полезным.

Поскольку ваше приложение является консольным приложением, оно выполняется в контексте синхронизации по умолчанию, где await Продолжительный обратный вызов будет вызван в том же потоке, на котором завершена ожидающая задача. Если вы хотите переключать темы после await SendAwaitResponseВы можете сделать это с await Task.Yield():

await SendAwaitResponse("first message");
await Task.Yield(); 
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock

Вы могли бы улучшить это, сохранив Thread.CurrentThread.ManagedThreadId внутри Task.Result и сравнивая его с идентификатором текущего потока после await, Если вы все еще в той же теме, сделайте await Task.Yield(),

Пока я так понимаю SendAwaitResponse Это упрощенная версия вашего фактического кода, он по-прежнему полностью синхронен внутри (как вы показали в своем вопросе). Почему вы ожидаете какого-либо переключателя потока там?

В любом случае, вы, вероятно, должны изменить свою логику так, чтобы она не делала предположений о том, в каком потоке вы находитесь. Избегайте смешивания await а также Task.Wait() и сделать весь ваш код асинхронным. Обычно можно придерживаться только одного Wait() где-то на верхнем уровне (например, внутри Main).

[ИЗДАНО] Вызов task.SetResult(msg) от ReceiverRun фактически передает поток управления в точку, где вы await на task - без переключения потоков из-за поведения контекста синхронизации по умолчанию. Итак, ваш код, который выполняет фактическую обработку сообщений, берет на себя ReceiverRun нить. В конце концов, SendAwaitResponse("second message").Wait() вызывается в том же потоке, вызывая тупик.

Ниже приведен код консольного приложения, смоделированный по образцу. Оно использует await Task.Yield() внутри ProcessAsync запланировать продолжение в отдельном потоке, чтобы поток управления вернулся к ReceiverRun и нет тупика

using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    class Program
    {
        class Worker
        {
            public struct Response
            {
                public string message;
                public int threadId;
            }

            CancellationToken _token;
            readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
            readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();

            public Worker(CancellationToken token)
            {
                _token = token;
            }

            string ReadNextMessage()
            {
                // using Thread.Sleep(100) for test purposes here,
                // should be using ManualResetEvent (or similar synchronization primitive),
                // depending on how messages arrive
                string message;
                while (!_messages.TryDequeue(out message))
                {
                    Thread.Sleep(100);
                    _token.ThrowIfCancellationRequested();
                }
                return message;
            }

            public void ReceiverRun()
            {
                LogThread("Enter ReceiverRun");
                while (true)
                {
                    var msg = ReadNextMessage();
                    LogThread("ReadNextMessage: " + msg);
                    var tcs = _requests[msg];
                    tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
                    _token.ThrowIfCancellationRequested(); // this is how we terminate the loop
                }
            }

            Task<Response> SendAwaitResponse(string msg)
            {
                LogThread("SendAwaitResponse: " + msg);
                var tcs = new TaskCompletionSource<Response>();
                _requests.TryAdd(msg, tcs);
                _messages.Enqueue(msg);
                return tcs.Task;
            }

            public async Task ProcessAsync()
            {
                LogThread("Enter Worker.ProcessAsync");

                var task1 = SendAwaitResponse("first message");
                await task1;
                LogThread("result1: " + task1.Result.message);
                // avoid deadlock for task2.Wait() with Task.Yield()
                // comment this out and task2.Wait() will dead-lock
                if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();

                var task2 = SendAwaitResponse("second message");
                task2.Wait();
                LogThread("result2: " + task2.Result.message);

                var task3 = SendAwaitResponse("third message");
                // still on the same thread as with result 2, no deadlock for task3.Wait()
                task3.Wait();
                LogThread("result3: " + task3.Result.message);

                var task4 = SendAwaitResponse("fourth message");
                await task4;
                LogThread("result4: " + task4.Result.message);
                // avoid deadlock for task5.Wait() with Task.Yield()
                // comment this out and task5.Wait() will dead-lock
                if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
                    await Task.Yield();

                var task5 = SendAwaitResponse("fifth message");
                task5.Wait();
                LogThread("result5: " + task5.Result.message);

                LogThread("Leave Worker.ProcessAsync");
            }

            public static void LogThread(string message)
            {
                Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
            }
        }

        static void Main(string[] args)
        {
            Worker.LogThread("Enter Main");
            var cts = new CancellationTokenSource(5000); // cancel after 5s
            var worker = new Worker(cts.Token);
            Task receiver = Task.Run(() => worker.ReceiverRun());
            Task main = worker.ProcessAsync();
            try
            {
                Task.WaitAll(main, receiver);
            }
            catch (Exception e)
            {
                Console.WriteLine("Exception: " + e.Message);
            }
            Worker.LogThread("Leave Main");
            Console.ReadLine();
        }
    }
}

Это не сильно отличается от выполнения Task.Run(() => task.SetResult(msg)) внутри ReceiverRun, Единственное преимущество, о котором я могу подумать, это то, что вы имеете явный контроль над тем, когда переключать потоки. Таким образом, вы можете оставаться в одной теме как можно дольше (например, для task2, task3, task4, но вам все еще нужен другой переключатель потока после task4 чтобы избежать тупика на task5.Wait()).

Оба решения в конечном итоге приведут к увеличению пула потоков, что плохо с точки зрения производительности и масштабируемости.

Теперь, если мы заменим task.Wait()с await task везде внутри ProcessAsync в приведенном выше коде нам не придется использовать await Task.Yield и по-прежнему не будет тупиков. Тем не менее, вся цепочка await звонки после 1-го await task1 внутри ProcessAsyncбудет фактически выполнен на ReceiverRunнить. Пока мы не заблокируем эту тему с другими Wait()в стиле вызовов и не выполняет большую часть работы с процессором, поскольку мы обрабатываем сообщения, этот подход может работать нормально (асинхронный ввод-выводawaitвызовы -style все еще должны быть в порядке, и они могут фактически вызывать неявное переключение потока).

Тем не менее, я думаю, что вам понадобится отдельный поток с установленным контекстом синхронизации сериализации для обработки сообщений (аналогично WindowsFormsSynchronizationContext). Вот где ваш асинхронный код, содержащий awaits должен бежать. Вам все равно нужно избегать использованияTask.Waitв этой теме. И если отдельная обработка сообщений занимает много работы с процессором, вы должны использоватьTask.Runза такую ​​работу. Для асинхронных вызовов, связанных с вводом-выводом, вы можете остаться в том же потоке.

Вы можете посмотреть на ActionDispatcher/ActionDispatcherSynchronizationContextиз библиотеки @StephenCleary Nito Asynchronous для вашей логики асинхронной обработки сообщений. Надеюсь, Стивен вскочил и дал лучший ответ.

"Я предполагал, что второй SendAwaitResponse будет выполняться в потоке ThreadPool, но он продолжается в потоке, созданном для ReceiverRun".

Это полностью зависит от того, что вы делаете в SendAwaitResponse. Асинхронность и параллелизм не одно и то же.

Проверьте: C# 5 Async / Await - это * одновременный *?

Немного опоздал на вечеринку, но вот мое решение, которое я считаю добавленной стоимостью.

Я боролся с этим также, я решил это, захватив SynchronizationContext на ожидаемый метод.

Это будет выглядеть примерно так:

// just a default sync context
private readonly SynchronizationContext _defaultContext = new SynchronizationContext();

void ReceiverRun()
{
    while (true)    // <-- i would replace this with a cancellation token
    {
        var msg = ReadNextMessage();
        TaskWithContext<TResult> task = requests[msg.RequestID];

        // if it wasn't a winforms/wpf thread, it would be null
        // we choose our default context (threadpool)
        var context = task.Context ?? _defaultContext;

        // execute it on the context which was captured where it was added. So it won't get completed on this thread.
        context.Post(state =>
        {
            if (msg.Error == null)
                task.TaskCompletionSource.SetResult(msg);
            else
                task.TaskCompletionSource.SetException(new Exception(msg.Error));
        });
    }
}

public static Task<Response> SendAwaitResponse(string msg)
{
    // The key is here! Save the current synchronization context.
    var t = new TaskWithContext<Response>(SynchronizationContext.Current); 

    requests.Add(GetID(msg), t);
    stream.Write(msg);
    return t.TaskCompletionSource.Task;
}

// class to hold a task and context
public class TaskWithContext<TResult>
{
    public SynchronizationContext Context { get; }

    public TaskCompletionSource<TResult> TaskCompletionSource { get; } = new TaskCompletionSource<Response>();

    public TaskWithContext(SynchronizationContext context)
    {
        Context = context;
    }
}
Другие вопросы по тегам