gpt4 book ai didi

c# - 为什么每个观察委托(delegate)都在一个新线程上运行

转载 作者:行者123 更新时间:2023-11-30 17:13:57 27 4
gpt4 key购买 nike

在 Rx 中,当对 ObserveOn 方法使用 Scheduler.NewThread 时,当 Rx 已经保证 OnNexts 永远不会重叠时,让每个观察委托(delegate) (OnNext) 在新线程上运行的优势是什么。如果每个 OnNext 都将被一个接一个地调用,为什么每个都需要新线程。

我理解为什么人们想要在不同于订阅和应用程序线程的线程上运行观察委托(delegate),但在它们永远不会并行运行时在新线程上运行每个观察委托(delegate)?......没有意义我还是我在这里遗漏了什么?

例如

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace RxTesting
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);

var numbers = from number in Enumerable.Range(1,10) select Process(number);

var observableNumbers = numbers.ToObservable()
.ObserveOn(Scheduler.NewThread)
.SubscribeOn(Scheduler.NewThread);

observableNumbers.Subscribe(
n => Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId));

Console.ReadKey();
}

private static int Process(int number)
{
Thread.Sleep(500);
Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
Thread.CurrentThread.ManagedThreadId);

return number;
}
}
}

以上代码产生以下结果。请注意,消费每次都在一个新线程上完成。

Application Thread : 8
Producing : 1 on Thread : 9
Consuming : 1 on Thread : 10
Producing : 2 on Thread : 9
Consuming : 2 on Thread : 11
Producing : 3 on Thread : 9
Consuming : 3 on Thread : 12
Producing : 4 on Thread : 9
Consuming : 4 on Thread : 13
Producing : 5 on Thread : 9
Consuming : 5 on Thread : 14
Producing : 6 on Thread : 9
Consuming : 6 on Thread : 15
Producing : 7 on Thread : 9
Consuming : 7 on Thread : 16
Producing : 8 on Thread : 9
Consuming : 8 on Thread : 17
Producing : 9 on Thread : 9
Consuming : 9 on Thread : 18
Producing : 10 on Thread : 9
Consuming : 10 on Thread : 19

最佳答案

NewThread 调度程序对长期运行的订阅者很有用。如果您不指定任何调度程序,生产者将被阻塞等待订阅者完成。通常,您可以使用 Scheduler.ThreadPool,但如果您希望有许多长时间运行的任务,您将不希望它们阻塞线程池(因为它可能不仅仅被单个可观察对象的订阅者使用).

例如,考虑对您的示例进行以下修改。我将延迟移至订阅者并添加了主线程何时准备好进行键盘输入的指示。注意取消注释 NewThead 行时的区别。

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading;

namespace RxTesting
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Application Thread : {0}", Thread.CurrentThread.ManagedThreadId);

var numbers = from number in Enumerable.Range(1, 10) select Process(number);

var observableNumbers = numbers.ToObservable()
// .ObserveOn(Scheduler.NewThread)
// .SubscribeOn(Scheduler.NewThread)
;

observableNumbers.Subscribe(
n => {
Thread.Sleep(500);
Console.WriteLine("Consuming : {0} \t on Thread : {1}", n, Thread.CurrentThread.ManagedThreadId);
});

Console.WriteLine("Waiting for keyboard");
Console.ReadKey();
}

private static int Process(int number)
{
Console.WriteLine("Producing : {0} \t on Thread : {1}", number,
Thread.CurrentThread.ManagedThreadId);

return number;
}
}
}

那么为什么 Rx 不优化为每个订阅者使用相同的线程呢?如果订阅者运行的时间太长以至于您需要一个新线程,那么创建线程的开销无论如何都是微不足道的。一个异常(exception)是,如果大多数订阅者运行时间较短,而少数订阅者运行时间较长,那么重用同一线程的优化确实很有用。

关于c# - 为什么每个观察委托(delegate)都在一个新线程上运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/9220689/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com