gpt4 book ai didi

c# - 为什么 RX 不在新线程上运行处理程序?

转载 作者:行者123 更新时间:2023-11-30 16:03:40 24 4
gpt4 key购买 nike

我正在使用 RX 2.2.5

在下面的示例中,我希望处理程序(Subscribe() 的委托(delegate))在新线程上运行,但是在运行应用程序时,所有 10 个数字都在同一个线程上使用一个接一个。

Console.WriteLine("Main Thread: {0}", Thread.CurrentThread.ManagedThreadId);

var source = Observable.Range(1, 10, TaskPoolScheduler.Default)
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(n =>
{
Thread.Sleep(1000);
Console.WriteLine(
"Value: {0} Thread: {1} IsPool: {2}",
n, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
});

输出:

Main Thread: 1
Value: 1 on Thread: 4 IsPool: False
Value: 2 on Thread: 4 IsPool: False
Value: 3 on Thread: 4 IsPool: False
Value: 4 on Thread: 4 IsPool: False
Value: 5 on Thread: 4 IsPool: False
Value: 6 on Thread: 4 IsPool: False
Value: 7 on Thread: 4 IsPool: False
Value: 8 on Thread: 4 IsPool: False
Value: 9 on Thread: 4 IsPool: False
Value: 10 on Thread: 4 IsPool: False

它们按顺序运行的事实也是一个谜,因为我使用 TaskPoolScheduler 来生成数字。

即使我将 NewThreadScheduler 替换为 TaskPoolSchedulerThreadPoolScheduler 我仍然得到一个线程,更有趣的是在这两种情况下Thread.CurrentThread.IsThreadPoolThreadFalse

我无法解释这种行为,因为当我查看 ThreadPoolScheduler 时,我看到了:

public override IDisposable Schedule<TState>(TState state, Func<IScheduler, TState, IDisposable> action)
{
if (action == null)
throw new ArgumentNullException("action");
SingleAssignmentDisposable d = new SingleAssignmentDisposable();
ThreadPool.QueueUserWorkItem((WaitCallback) (_ =>
{
if (d.IsDisposed)
return;
d.Disposable = action((IScheduler) this, state);
}), (object) null);
return (IDisposable) d;
}

我可以清楚地看到 ThreadPool.QueueUserWorkItem... 那么为什么 IsPool == False

我在这里错过了什么?

最佳答案

根据我上面的评论,您似乎正在尝试使用 Rx(一个用于查询和组合可观察数据序列的库)作为并行计算库?

我想看到你期望的结果你可以把你的查询改成这样

Console.WriteLine("Main Thread: {0}", Thread.CurrentThread.ManagedThreadId);

var source = Observable.Range(1, 10, TaskPoolScheduler.Default)
.SelectMany(i=>Observable.Start(()=>i, NewThreadScheduler.Default))
//.ObserveOn(NewThreadScheduler.Default)
.Subscribe(n =>
{
Thread.Sleep(1000);
Console.WriteLine(
"Value: {0} Thread: {1} IsPool: {2}",
n, Thread.CurrentThread.ManagedThreadId, Thread.CurrentThread.IsThreadPoolThread);
});

输出:

Main Thread: 11
Value: 1 Thread: 15 IsPool: False
Value: 4 Thread: 21 IsPool: False
Value: 2 Thread: 14 IsPool: False
Value: 3 Thread: 13 IsPool: False
Value: 5 Thread: 21 IsPool: False
Value: 6 Thread: 21 IsPool: False
Value: 7 Thread: 21 IsPool: False
Value: 8 Thread: 21 IsPool: False
Value: 9 Thread: 21 IsPool: False
Value: 10 Thread: 21 IsPool: False

请注意,我们在这里看到了多个线程,但也注意到我们现在得到了无序的值(很明显,因为我们引入了我们不再控制的并发)。所以这是选择你的毒药(或你合适的库)的情况

关于c# - 为什么 RX 不在新线程上运行处理程序?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/36163620/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com