gpt4 book ai didi

c# - StaTaskScheduler和STA线程消息泵送

转载 作者:IT王子 更新时间:2023-10-29 04:16:14 26 4
gpt4 key购买 nike

TL; DR:StaTaskScheduler运行的任务内的死锁。长版:

我正在使用Parallel Team的 StaTaskScheduler 中的ParallelExtensionsExtras托管第三方提供的一些旧版STA COM对象。 StaTaskScheduler实现细节的描述如下:

The good news is that TPL’s implementation is able to run on either MTA or STA threads, and takes into account relevant differences around underlying APIs like WaitHandle.WaitAll (which only supports MTA threads when the method is provided multiple wait handles).



我认为这意味着TPL的阻塞部分将使用等待API,该API会泵送诸如 CoWaitForMultipleHandles 之类的消息,以避免在STA线程上调用时出现死锁情况。

在我的情况下,我相信正在发生以下情况:进程内STA COM对象A对进程外对象B进行调用,然后期望通过B进行回调,作为传出调用的一部分。

以简化形式:
var result = await Task.Factory.StartNew(() =>
{
// in-proc object A
var a = new A();
// out-of-proc object B
var b = new B();
// A calls B and B calls back A during the Method call
return a.Method(b);
}, CancellationToken.None, TaskCreationOptions.None, staTaskScheduler);

问题是, a.Method(b)永不返回。据我所知,发生这种情况是因为 BlockingCollection<Task>内部某处的阻塞等待不会泵送消息,因此我对引用语句的假设可能是错误的。

编辑后的在测试WinForms应用程序的UI线程上执行时,相同的代码也起作用(即,向 TaskScheduler.FromCurrentSynchronizationContext()提供 staTaskScheduler而不是 Task.Factory.StartNew)。

解决这个问题的正确方法是什么?我是否应该实现一个自定义同步上下文,该上下文将显式地使用 CoWaitForMultipleHandles泵送消息并将其安装在由 StaTaskScheduler启动的每个STA线程上?

如果是这样, BlockingCollection的底层实现是否将调用我的 SynchronizationContext.Wait 方法?我可以使用 SynchronizationContext.WaitHelper 来实现 SynchronizationContext.Wait吗?

编辑了,其中包含一些代码,表明在执行阻塞等待时托管STA线程不会泵送。该代码是一个完整的控制台应用程序,可以复制/粘贴/运行:
using System;
using System.Collections.Concurrent;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleTestApp
{
class Program
{
// start and run an STA thread
static void RunStaThread(bool pump)
{
// test a blocking wait with BlockingCollection.Take
var tasks = new BlockingCollection<Task>();

var thread = new Thread(() =>
{
// Create a simple Win32 window
var hwndStatic = NativeMethods.CreateWindowEx(0, "Static", String.Empty, NativeMethods.WS_POPUP,
0, 0, 0, 0, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero, IntPtr.Zero);

// subclass it with a custom WndProc
IntPtr prevWndProc = IntPtr.Zero;

var newWndProc = new NativeMethods.WndProc((hwnd, msg, wParam, lParam) =>
{
if (msg == NativeMethods.WM_TEST)
Console.WriteLine("WM_TEST processed");
return NativeMethods.CallWindowProc(prevWndProc, hwnd, msg, wParam, lParam);
});

prevWndProc = NativeMethods.SetWindowLong(hwndStatic, NativeMethods.GWL_WNDPROC, newWndProc);
if (prevWndProc == IntPtr.Zero)
throw new ApplicationException();

// post a test WM_TEST message to it
NativeMethods.PostMessage(hwndStatic, NativeMethods.WM_TEST, IntPtr.Zero, IntPtr.Zero);

// BlockingCollection blocks without pumping, NativeMethods.WM_TEST never arrives
try { var task = tasks.Take(); }
catch (Exception e) { Console.WriteLine(e.Message); }

if (pump)
{
// NativeMethods.WM_TEST will arrive, because Win32 MessageBox pumps
Console.WriteLine("Now start pumping...");
NativeMethods.MessageBox(IntPtr.Zero, "Pumping messages, press OK to stop...", String.Empty, 0);
}
});

thread.SetApartmentState(ApartmentState.STA);
thread.Start();

Thread.Sleep(2000);

// this causes the STA thread to end
tasks.CompleteAdding();

thread.Join();
}

static void Main(string[] args)
{
Console.WriteLine("Testing without pumping...");
RunStaThread(false);

Console.WriteLine("\nTest with pumping...");
RunStaThread(true);

Console.WriteLine("Press Enter to exit");
Console.ReadLine();
}
}

// Interop
static class NativeMethods
{
[DllImport("user32")]
public static extern IntPtr SetWindowLong(IntPtr hwnd, int nIndex, WndProc newProc);

[DllImport("user32")]
public static extern IntPtr CallWindowProc(IntPtr lpPrevWndFunc, IntPtr hwnd, int msg, int wParam, int lParam);

[DllImport("user32.dll")]
public static extern IntPtr CreateWindowEx(int dwExStyle, string lpClassName, string lpWindowName, int dwStyle, int x, int y, int nWidth, int nHeight, IntPtr hWndParent, IntPtr hMenu, IntPtr hInstance, IntPtr lpParam);

[DllImport("user32.dll")]
public static extern bool PostMessage(IntPtr hwnd, uint msg, IntPtr wParam, IntPtr lParam);

[DllImport("user32.dll")]
public static extern int MessageBox(IntPtr hwnd, string text, String caption, int options);

public delegate IntPtr WndProc(IntPtr hwnd, int msg, int wParam, int lParam);

public const int GWL_WNDPROC = -4;
public const int WS_POPUP = unchecked((int)0x80000000);
public const int WM_USER = 0x0400;

public const int WM_TEST = WM_USER + 1;
}
}

产生输出:

测试无需抽水...
collection参数为空,并已被标记为完成添加。

用泵测试...
collection参数为空,并已被标记为完成添加。
现在开始抽水...
WM_TEST已处理
按Enter退出

最佳答案

我对您的问题的理解:您仅使用 StaTaskScheduler 来为传统的COM对象组织经典的COM STA公寓。您是而不是,它在StaTaskScheduler的STA线程上运行WinForms或WPF核心消息循环。也就是说,您在该线程中没有使用Application.RunApplication.DoEventsDispatcher.PushFrame之类的东西。如果这是一个错误的假设,请纠正我。

就其本身而言,StaTaskScheduler 不会在其创建的STA线程上安装任何同步上下文。因此,您依赖CLR为您发送消息。我仅在克里斯·布鲁姆(Chris Brumme)的Apartments and Pumping in the CLR中发现了一个隐式确认,即CLR在STA线程上运行:

I keep saying that managed blocking will perform “some pumping” when called on an STA thread. Wouldn’t it be great to know exactly what will get pumped? Unfortunately, pumping is a black art which is beyond mortal comprehension. On Win2000 and up, we simply delegate to OLE32’s CoWaitForMultipleHandles service.



这表明CLR在内部为STA线程使用 CoWaitForMultipleHandles 。此外, COWAIT_DISPATCH_WINDOW_MESSAGES标志 mention this的MSDN文档:

... in STA is only a small set of special-cased messages dispatched.



我做了 some research on that,但是无法通过 WM_TEST从示例代码中抽取 CoWaitForMultipleHandles,我们在对您的问题的评论中进行了讨论。我的理解是,前面提到的一小组特殊情况的消息 实际上将限制为某些COM编码特定的消息,并且不包括任何常规的通用消息,例如 WM_TEST

因此,回答您的问题:

... Should I implemented a custom synchronization context, which would explicitly pump messages with CoWaitForMultipleHandles, and install it on each STA thread started by StaTaskScheduler?



是的,我相信创建自定义同步上下文并覆盖 SynchronizationContext.Wait确实是正确的解决方案。

但是,您应该避免使用 CoWaitForMultipleHandles,而 使用 MsgWaitForMultipleObjectsEx 而不是。如果 MsgWaitForMultipleObjectsEx指示队列中有待处理的消息,则应手动使用 PeekMessage(PM_REMOVE)DispatchMessage泵送它。然后,您应该继续等待所有在相同 SynchronizationContext.Wait调用中的句柄。

注意 MsgWaitForMultipleObjectsEx MsgWaitForMultipleObjects 之间有一个细微但重要的区别。如果队列中已经看到一条消息(例如 PeekMessage(PM_NOREMOVE)GetQueueStatus),但后者未删除,则后者不会返回并保持阻塞状态。这对泵送不利,因为您的COM对象可能正在使用 PeekMessage之类的东西来检查消息队列。稍后可能会导致 MsgWaitForMultipleObjects在不期望的时候阻塞。

OTOH,带有 MsgWaitForMultipleObjectsEx标志的 MWMO_INPUTAVAILABLE没有这种缺点,在这种情况下会返回。

前一阵子,我创建了一个自定义版本的 StaTaskScheduler( available here as ThreadAffinityTaskScheduler )来尝试解决 different problem:为后续的 await延续保留一个具有线程相似性的线程池。如果您在多个 awaits中使用STA COM对象,则线程亲和性为 。原始 StaTaskScheduler 仅在其池限制为1个线程时才显示此行为。

因此,我继续进行了更多的 WM_TEST案例实验。最初,我在STA线程上安装了标准 SynchronizationContext 类的实例。没有收到 WM_TEST消息,这是预期的。

然后,我重写了 SynchronizationContext.Wait ,将其转发给 SynchronizationContext.WaitHelper 。它确实被调用了,但是仍然没有启动。

最后,我实现了功能齐全的消息泵循环,这是它的核心部分:
// the core loop
var msg = new NativeMethods.MSG();
while (true)
{
// MsgWaitForMultipleObjectsEx with MWMO_INPUTAVAILABLE returns,
// even if there's a message already seen but not removed in the message queue
nativeResult = NativeMethods.MsgWaitForMultipleObjectsEx(
count, waitHandles,
(uint)remainingTimeout,
QS_MASK,
NativeMethods.MWMO_INPUTAVAILABLE);

if (IsNativeWaitSuccessful(count, nativeResult, out managedResult) || WaitHandle.WaitTimeout == managedResult)
return managedResult;

// there is a message, pump and dispatch it
if (NativeMethods.PeekMessage(out msg, IntPtr.Zero, 0, 0, NativeMethods.PM_REMOVE))
{
NativeMethods.TranslateMessage(ref msg);
NativeMethods.DispatchMessage(ref msg);
}
if (hasTimedOut())
return WaitHandle.WaitTimeout;
}

这确实起作用,WM_TEST被抽出。 以下是测试的适应版本:
public static async Task RunAsync()
{
using (var staThread = new Noseratio.ThreadAffinity.ThreadWithAffinityContext(staThread: true, pumpMessages: true))
{
Console.WriteLine("Initial thread #" + Thread.CurrentThread.ManagedThreadId);
await staThread.Run(async () =>
{
Console.WriteLine("On STA thread #" + Thread.CurrentThread.ManagedThreadId);
// create a simple Win32 window
IntPtr hwnd = CreateTestWindow();

// Post some WM_TEST messages
Console.WriteLine("Post some WM_TEST messages...");
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(1), IntPtr.Zero);
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(2), IntPtr.Zero);
NativeMethods.PostMessage(hwnd, NativeMethods.WM_TEST, new IntPtr(3), IntPtr.Zero);
Console.WriteLine("Press Enter to continue...");
await ReadLineAsync();

Console.WriteLine("After await, thread #" + Thread.CurrentThread.ManagedThreadId);
Console.WriteLine("Pending messages in the queue: " + (NativeMethods.GetQueueStatus(0x1FF) >> 16 != 0));

Console.WriteLine("Exiting STA thread #" + Thread.CurrentThread.ManagedThreadId);
}, CancellationToken.None);
}
Console.WriteLine("Current thread #" + Thread.CurrentThread.ManagedThreadId);
}

输出:

初始线程#9
在STA线程#10上
发布一些WM_TEST消息...
按Enter继续...
WM_TEST已处理:1
WM_TEST已处理:2
WM_TEST已处理:3

等待之后,线程#10
队列中的待处理消息:False
退出STA线程#10
当前线程#12
按任何一个键退出

请注意,此实现同时支持线程亲缘关系(它在 await之后停留在线程#10上)和消息泵送。完整的源代码包含可重用的部分( ThreadAffinityTaskSchedulerThreadWithAffinityContext),并且可以使用 here as self-contained console app。它尚未经过全面测试,因此使用时需您自担风险。

关于c# - StaTaskScheduler和STA线程消息泵送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/21211998/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com