StaTaskScheduler и прокачка сообщений потока STA

TL;DR: тупик внутри задачи, выполняемойStaTaskScheduler , Длинная версия:

я используюStaTaskScheduler из ParallelExtensionsExtras от Parallel Team, для размещения некоторых устаревших COM-объектов STA, предоставленных третьей стороной. Описание StaTaskScheduler Детали реализации говорят о следующем:

Хорошая новость заключается в том, что реализация TPL может работать в потоках MTA или STA и учитывать соответствующие различия между базовыми API-интерфейсами, такими как WaitHandle.WaitAll (который поддерживает потоки MTA только тогда, когда в методе предусмотрено несколько дескрипторов ожидания).

Я думал, что это будет означать, что блокирующие части TPL будут использовать API ожидания, который качает сообщения, как CoWaitForMultipleHandles, чтобы избежать тупиковых ситуаций при вызове в потоке STA.

В моей ситуации, я полагаю, происходит следующее: COM-объект A STA in-proc делает вызов объекту B-out-of-proc, а затем ожидает обратный вызов от B via как часть исходящего вызова.

В упрощенном виде:

var result = await Task.Factory.StartNew(() =>
{
    // in-proc object A
    var a = new A(); 
    // out-of-proc object B
    var b = new B(); 
    // A calls B and B calls back A during the Method call
    return a.Method(b);     
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);

Проблема в,a.Method(b)никогда не вернется. Насколько я могу судить, это происходит из-за ожидания блокировки где-то внутриBlockingCollection<Task>не качает сообщения, поэтому мое предположение о цитируемом утверждении, вероятно, неверно.

EDITED Тот же код работает, когда выполняется в потоке пользовательского интерфейса тестового приложения WinForms (то естьTaskScheduler.FromCurrentSynchronizationContext()вместоstaTaskScheduler в Task.Factory.StartNew).

Как правильно решить эту проблему? Должен ли я реализовать пользовательский контекст синхронизации, который будет явно качать сообщения с CoWaitForMultipleHandlesи установите его в каждом потоке STA, запущенномStaTaskScheduler?

Если да, будет ли основная реализация BlockingCollectionзвоню моемуSynchronizationContext.Waitметод? Могу ли я использовать SynchronizationContext.WaitHelper реализовать SynchronizationContext.Wait?


Отредактировано с некоторым кодом, показывающим, что управляемый поток STA не качает, когда делает ожидание блокировки. Код представляет собой законченное консольное приложение, готовое для копирования / вставки / запуска:

using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleTestApp
{
    class Program
    {
        // start and run an STA thread
        static void RunStaThread(bool pump)
        {
            // test a blocking wait with BlockingCollection.Take
            var tasks = new BlockingCollection<Task>();

            var thread = new Thread(() => 
            {
                // Create a simple Win32 window 
                var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
                    0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);

                // subclass it with a custom WndProc
                IntPtr prevWndProc = IntPtr.Zero;

                var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
                {
                    if (msg == NativeMethods.WM_TEST)
                        Console.WriteLine("WM_TEST processed");
                    return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
                });

                prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
                if (prevWndProc == IntPtr.Zero)
                    throw new ApplicationException();

                // post a test WM_TEST message to it
                NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);

                // BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
                try { var task = tasks.Take(); }
                catch (Exception e) { Console.WriteLine(e.Message); }

                if (pump)
                {
                    // NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
                    Console.WriteLine("Now start pumping...");
                    NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
                }
            });

            thread.SetApartmentState(ApartmentState.STA);
            thread.Start();

            Thread.Sleep(2000);

            // this causes the STA thread to end
            tasks.CompleteAdding(); 

            thread.Join();
        }

        static void Main(string[] args)
        {
            Console.WriteLine("Testing without pumping...");
            RunStaThread(false);

            Console.WriteLine("\nTest with pumping...");
            RunStaThread(true);

            Console.WriteLine("Press Enter to exit");
            Console.ReadLine();
        }
    }

    // Interop
    static class NativeMethods
    {
        [DllImport("user32")]
        public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);

        [DllImport("user32")]
        public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);

        [DllImport("user32.dll")]
        public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);

        [DllImport("user32.dll")]
        public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);

        [DllImport("user32.dll")]
        public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);

        public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);

        public const int GWL_WNDPROC = -4;
        public const int WS_POPUP = unchecked((int)0x80000000);
        public const int WM_USER = 0x0400;

        public const int WM_TEST = WM_USER + 1;
    }
}

Это производит вывод:

Тестирование без прокачки...
Аргумент коллекции является пустым и был отмечен как завершенный в отношении дополнений. Тест с прокачкой...
Аргумент коллекции является пустым и был отмечен как завершенный в отношении дополнений.
Теперь начните прокачивать...
WM_TEST обработано
Нажмите Enter, чтобы выйти

2 ответа

Решение

Мое понимание вашей проблемы: вы используете StaTaskScheduler Только организовать классическую квартиру COM STA для ваших традиционных объектов COM. Вы не запускаете цикл сообщений ядра WinForms или WPF в потоке STA StaTaskScheduler, То есть вы не используете ничего подобного Application.Run, Application.DoEvents или же Dispatcher.PushFrame внутри этой темы. Поправьте меня, если это неверное предположение.

Само по себе StaTaskScheduler не устанавливает никакого контекста синхронизации в потоках STA, которые он создает. Таким образом, вы полагаетесь на CLR для прокачки сообщений для вас. Я нашел только неявное подтверждение того, что Крис Брумме накачивает CLR в потоках STA, в квартирах и накачках в CLR:

Я продолжаю говорить, что управляемая блокировка будет выполнять "некоторую накачку" при вызове в потоке STA. Разве не было бы здорово точно знать, что будет прокачиваться? К сожалению, прокачка - это черное искусство, которое невозможно понять смертным. На Win2000 и выше мы просто делегируем OLE32 сервис CoWaitForMultipleHandles.

Это указывает на использование CLR CoWaitForMultipleHandles внутренне для нитей STA. Кроме того, документы MSDN для COWAIT_DISPATCH_WINDOW_MESSAGES флаг упомянуть это:

... в STA отправляется только небольшой набор специальных сообщений.

Я провел некоторое исследование по этому вопросу, но не смог добраться до WM_TEST из вашего примера кода с CoWaitForMultipleHandles Мы обсуждали это в комментариях к вашему вопросу. Насколько я понимаю, вышеупомянутый небольшой набор специальных сообщений действительно ограничен некоторыми сообщениями, относящимися к COM-маршаллеру, и не включает никаких обычных сообщений общего назначения, таких как ваши WM_TEST,

Итак, чтобы ответить на ваш вопрос:

... Должен ли я реализовать пользовательский контекст синхронизации, который будет явно накачивать сообщения с CoWaitForMultipleHandles и устанавливать его в каждом потоке STA, запущенном StaTaskScheduler?

Да, я считаю, что создание собственного контекста синхронизации и переопределение SynchronizationContext.Wait это действительно правильное решение.

Тем не менее, вы должны избегать использования CoWaitForMultipleHandles и использовать MsgWaitForMultipleObjectsEx вместо. Если MsgWaitForMultipleObjectsEx указывает на наличие ожидающего сообщения в очереди, вы должны вручную добавить его PeekMessage(PM_REMOVE) а также DispatchMessage, Тогда вы должны продолжать ждать ручки, все внутри одинаково SynchronizationContext.Wait вызов.

Обратите внимание, что есть тонкое, но важное различие между MsgWaitForMultipleObjectsEx а также MsgWaitForMultipleObjects, Последний не возвращается и продолжает блокировать, если в очереди уже есть сообщение (например, с PeekMessage(PM_NOREMOVE) или же GetQueueStatus), но не удаляется. Это плохо для прокачки, потому что ваши COM-объекты могут использовать что-то вроде PeekMessage проверить очередь сообщений. Это может позже привести MsgWaitForMultipleObjects блокировать, когда не ожидается.

Ото, MsgWaitForMultipleObjectsEx с MWMO_INPUTAVAILABLE Флаг не имеет такого недостатка, и в этом случае вернется.

Некоторое время назад я создал собственную версию StaTaskScheduler ( доступно здесь как ThreadAffinityTaskScheduler) в попытке решить другую проблему: поддержание пула потоков со схожестью потоков для последующего await продолжения. Привязка потоков жизненно важна, если вы используете STA COM-объекты в нескольких awaits, Оригинал StaTaskScheduler проявляет это поведение только тогда, когда его пул ограничен 1 потоком.

Поэтому я продолжил и поэкспериментировал с вашим WM_TEST дело. Первоначально я установил экземпляр стандарта SynchronizationContext класс в потоке STA. WM_TEST сообщение не получило, что ожидалось.

Тогда я переопределил SynchronizationContext.Wait просто переслать SynchronizationContext.WaitHelper, Это действительно вызвали, но все еще не качало.

Наконец, я реализовал полнофункциональный цикл обработки сообщений, вот его основная часть:

// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
    // MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
    // even if there's a message already seen but not removed in the message queue
    nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
        count, waitHandles,
        (uint)remainingTimeout,
        QS_MASK,
        NativeMethods.MWMO_INPUTAVAILABLE);

    if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
        return managedResult;

    // there is a message, pump and dispatch it
    if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
    {
        NativeMethods.TranslateMessage(ref msg);
        NativeMethods.DispatchMessage(ref msg);
    }
    if (hasTimedOut())
        return WaitHandle.WaitTimeout;
}

Это работает, WM_TEST получает насос. Ниже приведена адаптированная версия вашего теста:

public static async Task RunAsync()
{
    using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
    {
        Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
        await staThread.Run(async () =>
        {
            Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
            // create a simple Win32 window
            IntPtr hwnd = CreateTestWindow();

            // Post some WM_TEST messages
            Console.WriteLine("Post some WM_TEST messages...");
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
            NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
            Console.WriteLine("Press Enter to continue...");
            await ReadLineAsync();

            Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
            Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));

            Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
        }, CancellationToken.None);
    }
    Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}

Выход:

Начальная тема № 9
На ните № 10
Опубликовать несколько сообщений WM_TEST...
Нажмите Enter, чтобы продолжить...
WM_TEST обработано: 1
WM_TEST обработано: 2
WM_TEST обработано: 3

После ожидания, нить № 10
Ожидающие сообщения в очереди: False
Выход из потока STA № 10
Текущая тема #12
нажмите любую клавишу для выхода 

Обратите внимание, что эта реализация поддерживает как сходство потоков (оно остается в потоке #10 после await) и сообщение прокачка. Полный исходный код содержит повторно используемые части (ThreadAffinityTaskScheduler а также ThreadWithAffinityContext) и доступен здесь как отдельное консольное приложение. Он не был тщательно протестирован, поэтому используйте его на свой страх и риск.

Тема прокачки резьбы STA - это большая тема, и очень немногие программисты имеют приятное время для решения тупиковых ситуаций. Оригинальный документ об этом был написан Крисом Браммом, главным умным парнем, который работал на.NET. Вы найдете это в этом сообщении в блоге. К сожалению, это довольно мало по специфике, он не выходит за пределы того, что CLR немного прокачивает, но без каких-либо подробностей о точных правилах.

Код, о котором он говорит, добавлен в.NET 2.0, присутствует во внутренней функции CLR с именем MsgWaitHelper(). Исходный код для.NET 2.0 доступен через дистрибутив SSCLI20. Очень полный, но источник для MsgWaitHelper () не включен. Довольно необычно. Декомпиляция это скорее безнадежное дело, оно очень большое.

Единственное, что нужно убрать из его поста в блоге, это опасность повторного входа. Накачка потока STA опасна из-за его способности отправлять сообщения Windows и получать произвольный код для выполнения, когда ваша программа не находится в правильном состоянии, чтобы такой код мог выполняться. То, что большинство программистов VB6 знает, когда он использовал DoEvents(), чтобы получить модальный цикл в своем коде, чтобы остановить зависание пользовательского интерфейса. Я написал пост о его наиболее типичных опасностях. MsgWaitHelper() выполняет именно этот тип прокачки, однако очень избирательно относится к тому, какой именно код он позволяет запускать.

Вы можете получить некоторое представление о том, что он делает внутри вашей тестовой программы, запустив программу без подключенного отладчика, а затем подключив неуправляемый отладчик. Вы увидите, что это блокирует NtWaitForMultipleObjects(). Я сделал еще один шаг вперед и установил точку останова на PeekMessageW(), чтобы получить эту трассировку стека:

user32.dll!PeekMessageW()   Unknown
combase.dll!CCliModalLoop::MyPeekMessage(tagMSG * pMsg, HWND__ * hwnd, unsigned int min, unsigned int max, unsigned short wFlag) Line 2305  C++
combase.dll!CCliModalLoop::PeekRPCAndDDEMessage() Line 2008 C++
combase.dll!CCliModalLoop::FindMessage(unsigned long dwStatus) Line 2087    C++
combase.dll!CCliModalLoop::HandleWakeForMsg() Line 1707 C++
combase.dll!CCliModalLoop::BlockFn(void * * ahEvent, unsigned long cEvents, unsigned long * lpdwSignaled) Line 1645 C++
combase.dll!ClassicSTAThreadWaitForHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * pdwIndex) Line 46 C++
combase.dll!CoWaitForMultipleHandles(unsigned long dwFlags, unsigned long dwTimeout, unsigned long cHandles, void * * pHandles, unsigned long * lpdwindex) Line 120 C++
clr.dll!MsgWaitHelper(int,void * *,int,unsigned long,int)   Unknown
clr.dll!Thread::DoAppropriateWaitWorker(int,void * *,int,unsigned long,enum WaitMode)   Unknown
clr.dll!Thread::DoAppropriateWait(int,void * *,int,unsigned long,enum WaitMode,struct PendingSync *)    Unknown
clr.dll!CLREventBase::WaitEx(unsigned long,enum WaitMode,struct PendingSync *)  Unknown
clr.dll!CLREventBase::Wait(unsigned long,int,struct PendingSync *)  Unknown
clr.dll!Thread::Block(int,struct PendingSync *) Unknown
clr.dll!SyncBlock::Wait(int,int)    Unknown
clr.dll!ObjectNative::WaitTimeout(bool,int,class Object *)  Unknown

Помните, что я записал эту трассировку стека в Windows 8.1, в старых версиях Windows она будет выглядеть иначе. Модальный цикл COM был в значительной степени изменен в Windows 8, это также очень важно для программ WinRT. Не знаю много об этом, но, похоже, у него есть другая модель потоков STA, названная ASTA, которая делает более ограниченный тип прокачки, закрепленный в добавленном CoWaitForMultipleObjects ()

ObjectNative:: WaitTimeout () - это место, где SemaphoreSlim.Wait() внутри метода BlockingCollection.Take() начинает выполнять код CLR. Вы видите, как он пробивается сквозь уровни внутреннего кода CLR, чтобы достичь мифической функции MsgWaitHelper (), а затем переключается на печально известный цикл модального диспетчера COM.

Сигнал летучей мыши о том, что он выполняет "неправильную" прокачку в вашей программе, - это вызов метода CliModalLoop::PeekRPCAndDDEMessage(). Другими словами, рассматривается только вид сообщений о взаимодействии, которые публикуются в определенном внутреннем окне, которое отправляет вызовы COM, которые пересекают границу квартиры. Это не будет качать сообщения, которые находятся в очереди сообщений для вашего собственного окна.

Это понятное поведение, Windows может быть абсолютно уверена, что повторный вход не убьет вашу программу, когда он увидит, что ваш поток пользовательского интерфейса простаивает. Он простаивает, когда прокачивает сам цикл сообщений, вызов PeekMessage() или GetMessage() указывает на это состояние. Проблема в том, что вы не качаете себя. Вы нарушили основной контракт потока STA, он должен прокачать цикл сообщений. Надеяться на то, что модальная петля COM сделает за вас работу, - пустая надежда.

Вы можете исправить это, хотя я не рекомендую это делать. CLR предоставит это самому приложению для ожидания с помощью правильно сконструированного объекта SynchronizationContext.Current. Вы можете создать его, создав собственный класс и переопределив метод Wait (). Вызовите метод SetWaitNotificationRequired(), чтобы убедить CLR, что он должен оставить это на ваше усмотрение. Неполная версия, демонстрирующая подход:

class MySynchronizationProvider : System.Threading.SynchronizationContext {
    public MySynchronizationProvider() {
        base.SetWaitNotificationRequired();
    }
    public override int Wait(IntPtr[] waitHandles, bool waitAll, int millisecondsTimeout) {
        for (; ; ) {
            int result = MsgWaitForMultipleObjects(waitHandles.Length, waitHandles, waitAll, millisecondsTimeout, 8);
            if (result == waitHandles.Length) System.Windows.Forms.Application.DoEvents();
            else return result;
        }
    }
    [DllImport("user32.dll")]
    private static extern int MsgWaitForMultipleObjects(int cnt, IntPtr[] waitHandles, bool waitAll,
        int millisecondTimeout, int mask);        
}

И установите его в начале вашей темы:

    System.ComponentModel.AsyncOperationManager.SynchronizationContext =
        new MySynchronizationProvider();

Теперь вы увидите, как отправляется ваше сообщение WM_TEST. Это вызов Application.DoEvents(), который отправил его. Я мог бы скрыть это, используя PeekMessage + DispatchMessage, но это скрыло бы опасность этого кода, лучше не вставлять DoEvents() под таблицу. Вы действительно играете в очень опасную повторную игру здесь. Не используйте этот код.

Короче говоря, единственная надежда на правильное использование StaThreadScheduler - это когда он используется в коде, который уже реализовал контракт STA, и должен работать так, как это делает поток STA. На самом деле это было предназначено в качестве лейкопластыря для старого кода, где вам не нужно роскошь контролировать состояние потока. Как и любой код, который начал жизнь в программе VB6 или надстройке Office. Немного поэкспериментируя с этим, я не думаю, что на самом деле это может сработать. Также примечательно, что необходимость в этом должна быть полностью устранена с доступностью asych/await.

Другие вопросы по тегам