Вызов TaskCompletionSource.SetResult неблокирующим образом.
Я обнаружил, что TaskCompletionSource.SetResult();
вызывает код, ожидающий задачу перед возвратом. В моем случае это приводит к тупику.
Это упрощенная версия, которая запускается в обычном Thread
void ReceiverRun()
while (true)
{
var msg = ReadNextMessage();
TaskCompletionSource<Response> task = requests[msg.RequestID];
if(msg.Error == null)
task.SetResult(msg);
else
task.SetException(new Exception(msg.Error));
}
}
"Асинхронная" часть кода выглядит примерно так.
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
Ожидание фактически вложено в не асинхронные вызовы.
SendAwaitResponse(упрощенный)
public static Task<Response> SendAwaitResponse(string msg)
{
var t = new TaskCompletionSource<Response>();
requests.Add(GetID(msg), t);
stream.Write(msg);
return t.Task;
}
Я предполагал, что второй SendAwaitResponse будет выполняться в потоке ThreadPool, но он продолжается в потоке, созданном для ReceiverRun.
Есть ли способ установить результат задачи без продолжения ожидаемого кода?
Приложение представляет собой консольное приложение.
3 ответа
Я обнаружил, что TaskCompletionSource.SetResult(); вызывает код, ожидающий задачу перед возвратом. В моем случае это приводит к тупику.
Да, у меня есть запись в блоге, документирующая это (AFAIK это не задокументировано в MSDN). Тупик возникает из-за двух вещей:
- Там смесь
async
и блокирующий код (т.е.async
метод вызываетWait
). - Продолжение задачи запланировано с использованием
TaskContinuationOptions.ExecuteSynchronously
,
Я рекомендую начать с самого простого возможного решения: удалить первое (1). Т.е. не смешивай async
а также Wait
звонки:
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
Вместо этого используйте await
последовательно:
await SendAwaitResponse("first message");
await SendAwaitResponse("second message");
Если вам нужно, вы можете Wait
в альтернативной точке дальше вверх по стеку вызовов (не в async
метод).
Это мое самое рекомендуемое решение. Однако, если вы хотите попробовать удалить вторую вещь (2), вы можете сделать пару трюков: либо обернуть SetResult
в Task.Run
заставить его в отдельный поток (моя библиотека AsyncEx имеет *WithBackgroundContinuations
методы расширения, которые делают именно это), или дают вашему потоку фактический контекст (такой как мой AsyncContext
введите) и укажите ConfigureAwait(false)
, что заставит продолжение игнорировать ExecuteSynchronously
флаг.
Но эти решения гораздо сложнее, чем просто разделение async
и код блокировки.
Как примечание, взгляните на поток данных TPL; Похоже, вы можете найти это полезным.
Поскольку ваше приложение является консольным приложением, оно выполняется в контексте синхронизации по умолчанию, где await
Продолжительный обратный вызов будет вызван в том же потоке, на котором завершена ожидающая задача. Если вы хотите переключать темы после await SendAwaitResponse
Вы можете сделать это с await Task.Yield()
:
await SendAwaitResponse("first message");
await Task.Yield();
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock
Вы могли бы улучшить это, сохранив Thread.CurrentThread.ManagedThreadId
внутри Task.Result
и сравнивая его с идентификатором текущего потока после await
, Если вы все еще в той же теме, сделайте await Task.Yield()
,
Пока я так понимаю SendAwaitResponse
Это упрощенная версия вашего фактического кода, он по-прежнему полностью синхронен внутри (как вы показали в своем вопросе). Почему вы ожидаете какого-либо переключателя потока там?
В любом случае, вы, вероятно, должны изменить свою логику так, чтобы она не делала предположений о том, в каком потоке вы находитесь. Избегайте смешивания await
а также Task.Wait()
и сделать весь ваш код асинхронным. Обычно можно придерживаться только одного Wait()
где-то на верхнем уровне (например, внутри Main
).
[ИЗДАНО] Вызов task.SetResult(msg)
от ReceiverRun
фактически передает поток управления в точку, где вы await
на task
- без переключения потоков из-за поведения контекста синхронизации по умолчанию. Итак, ваш код, который выполняет фактическую обработку сообщений, берет на себя ReceiverRun
нить. В конце концов, SendAwaitResponse("second message").Wait()
вызывается в том же потоке, вызывая тупик.
Ниже приведен код консольного приложения, смоделированный по образцу. Оно использует await Task.Yield()
внутри ProcessAsync
запланировать продолжение в отдельном потоке, чтобы поток управления вернулся к ReceiverRun
и нет тупика
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
class Program
{
class Worker
{
public struct Response
{
public string message;
public int threadId;
}
CancellationToken _token;
readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
public Worker(CancellationToken token)
{
_token = token;
}
string ReadNextMessage()
{
// using Thread.Sleep(100) for test purposes here,
// should be using ManualResetEvent (or similar synchronization primitive),
// depending on how messages arrive
string message;
while (!_messages.TryDequeue(out message))
{
Thread.Sleep(100);
_token.ThrowIfCancellationRequested();
}
return message;
}
public void ReceiverRun()
{
LogThread("Enter ReceiverRun");
while (true)
{
var msg = ReadNextMessage();
LogThread("ReadNextMessage: " + msg);
var tcs = _requests[msg];
tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
_token.ThrowIfCancellationRequested(); // this is how we terminate the loop
}
}
Task<Response> SendAwaitResponse(string msg)
{
LogThread("SendAwaitResponse: " + msg);
var tcs = new TaskCompletionSource<Response>();
_requests.TryAdd(msg, tcs);
_messages.Enqueue(msg);
return tcs.Task;
}
public async Task ProcessAsync()
{
LogThread("Enter Worker.ProcessAsync");
var task1 = SendAwaitResponse("first message");
await task1;
LogThread("result1: " + task1.Result.message);
// avoid deadlock for task2.Wait() with Task.Yield()
// comment this out and task2.Wait() will dead-lock
if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task2 = SendAwaitResponse("second message");
task2.Wait();
LogThread("result2: " + task2.Result.message);
var task3 = SendAwaitResponse("third message");
// still on the same thread as with result 2, no deadlock for task3.Wait()
task3.Wait();
LogThread("result3: " + task3.Result.message);
var task4 = SendAwaitResponse("fourth message");
await task4;
LogThread("result4: " + task4.Result.message);
// avoid deadlock for task5.Wait() with Task.Yield()
// comment this out and task5.Wait() will dead-lock
if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task5 = SendAwaitResponse("fifth message");
task5.Wait();
LogThread("result5: " + task5.Result.message);
LogThread("Leave Worker.ProcessAsync");
}
public static void LogThread(string message)
{
Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
}
}
static void Main(string[] args)
{
Worker.LogThread("Enter Main");
var cts = new CancellationTokenSource(5000); // cancel after 5s
var worker = new Worker(cts.Token);
Task receiver = Task.Run(() => worker.ReceiverRun());
Task main = worker.ProcessAsync();
try
{
Task.WaitAll(main, receiver);
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
}
Worker.LogThread("Leave Main");
Console.ReadLine();
}
}
}
Это не сильно отличается от выполнения Task.Run(() => task.SetResult(msg))
внутри ReceiverRun
, Единственное преимущество, о котором я могу подумать, это то, что вы имеете явный контроль над тем, когда переключать потоки. Таким образом, вы можете оставаться в одной теме как можно дольше (например, для task2
, task3
, task4
, но вам все еще нужен другой переключатель потока после task4
чтобы избежать тупика на task5.Wait()
).
Оба решения в конечном итоге приведут к увеличению пула потоков, что плохо с точки зрения производительности и масштабируемости.
Теперь, если мы заменим task.Wait()
с await task
везде внутри ProcessAsync
в приведенном выше коде нам не придется использовать await Task.Yield
и по-прежнему не будет тупиков. Тем не менее, вся цепочка await
звонки после 1-го await task1
внутри ProcessAsync
будет фактически выполнен на ReceiverRun
нить. Пока мы не заблокируем эту тему с другими Wait()
в стиле вызовов и не выполняет большую часть работы с процессором, поскольку мы обрабатываем сообщения, этот подход может работать нормально (асинхронный ввод-выводawait
вызовы -style все еще должны быть в порядке, и они могут фактически вызывать неявное переключение потока).
Тем не менее, я думаю, что вам понадобится отдельный поток с установленным контекстом синхронизации сериализации для обработки сообщений (аналогично WindowsFormsSynchronizationContext
). Вот где ваш асинхронный код, содержащий awaits
должен бежать. Вам все равно нужно избегать использованияTask.Wait
в этой теме. И если отдельная обработка сообщений занимает много работы с процессором, вы должны использоватьTask.Run
за такую работу. Для асинхронных вызовов, связанных с вводом-выводом, вы можете остаться в том же потоке.
Вы можете посмотреть на ActionDispatcher
/ActionDispatcherSynchronizationContext
из библиотеки @StephenCleary Nito Asynchronous для вашей логики асинхронной обработки сообщений. Надеюсь, Стивен вскочил и дал лучший ответ.
"Я предполагал, что второй SendAwaitResponse будет выполняться в потоке ThreadPool, но он продолжается в потоке, созданном для ReceiverRun".
Это полностью зависит от того, что вы делаете в SendAwaitResponse. Асинхронность и параллелизм не одно и то же.
Проверьте: C# 5 Async / Await - это * одновременный *?
Немного опоздал на вечеринку, но вот мое решение, которое я считаю добавленной стоимостью.
Я боролся с этим также, я решил это, захватив SynchronizationContext на ожидаемый метод.
Это будет выглядеть примерно так:
// just a default sync context
private readonly SynchronizationContext _defaultContext = new SynchronizationContext();
void ReceiverRun()
{
while (true) // <-- i would replace this with a cancellation token
{
var msg = ReadNextMessage();
TaskWithContext<TResult> task = requests[msg.RequestID];
// if it wasn't a winforms/wpf thread, it would be null
// we choose our default context (threadpool)
var context = task.Context ?? _defaultContext;
// execute it on the context which was captured where it was added. So it won't get completed on this thread.
context.Post(state =>
{
if (msg.Error == null)
task.TaskCompletionSource.SetResult(msg);
else
task.TaskCompletionSource.SetException(new Exception(msg.Error));
});
}
}
public static Task<Response> SendAwaitResponse(string msg)
{
// The key is here! Save the current synchronization context.
var t = new TaskWithContext<Response>(SynchronizationContext.Current);
requests.Add(GetID(msg), t);
stream.Write(msg);
return t.TaskCompletionSource.Task;
}
// class to hold a task and context
public class TaskWithContext<TResult>
{
public SynchronizationContext Context { get; }
public TaskCompletionSource<TResult> TaskCompletionSource { get; } = new TaskCompletionSource<Response>();
public TaskWithContext(SynchronizationContext context)
{
Context = context;
}
}