gpt4 book ai didi

c# - Observable.ObserveOn() 似乎没有效果

转载 作者:太空狗 更新时间:2023-10-30 01:18:23 26 4
gpt4 key购买 nike

我正在尝试使用 Rx 并行处理项目。看来我不能告诉 Rx 并行运行我的观察者的 OnNext() 。下面是测试代码来演示

[Test]
public void ObservableObserveOnNewThreadRunsInParallel()
{
Console.WriteLine("Starting thread: {0}", Thread.CurrentThread.ManagedThreadId);

// store items as they are output
var list = new List<Tuple<string, int, int, int, TimeSpan>>();

// used to wait until a sequences are complete
var ev = new AutoResetEvent(false);

// try these schedulers
var schedulers = new[] {
Tuple.Create("ThreadPoolScheduler.Instance", (IScheduler)ThreadPoolScheduler.Instance),
Tuple.Create("NewThreadScheduler.Default", (IScheduler)NewThreadScheduler.Default),
Tuple.Create("TaskPoolScheduler.Default", (IScheduler)TaskPoolScheduler.Default),
Tuple.Create("Scheduler.Default", (IScheduler)Scheduler.Default),
Tuple.Create("Scheduler.Immediate", (IScheduler)Scheduler.Immediate),
};

// try each scheduler
foreach (var schedulerTuple in
schedulers) {

// emit tuples <i, delay> where delay decreases with each new tuple
// such that output timing is expected to be reversed
var observable =
Observable.Range(0, 5)
.Select(i => Tuple.Create((int)i, (int)(500 - i * 100)))
.Take(5);

var dt = DateTime.Now;
Tuple<string, IScheduler> scheduler = schedulerTuple;


observable
// specify the scheduler to use
.ObserveOn(schedulerTuple.Item2)
.Subscribe(
v => {
// emulate some work (first items take longer than last items)
Thread.Sleep(v.Item2);

// record when the item is done recording
lock (list)
list.Add(
Tuple.Create(
scheduler.Item1,
v.Item1,
v.Item2,
Thread.CurrentThread.ManagedThreadId,
dt - DateTime.Now));
},
// let the test go on
() => ev.Set());

// wait until the end of the sequence
ev.WaitOne();
}

// print observed order
foreach (var i in list) {
Console.WriteLine(i);
}
}

输出:

Starting thread: 5
(ThreadPoolScheduler.Instance, 0, 500, 9, -00:00:04.2514251)
(ThreadPoolScheduler.Instance, 1, 400, 9, -00:00:04.6524652)
(ThreadPoolScheduler.Instance, 2, 300, 9, -00:00:04.9524952)
(ThreadPoolScheduler.Instance, 3, 200, 9, -00:00:05.1525152)
(ThreadPoolScheduler.Instance, 4, 100, 9, -00:00:05.2525252)
(NewThreadScheduler.Default, 0, 500, 11, -00:00:06.5066506)
(NewThreadScheduler.Default, 1, 400, 11, -00:00:06.9066906)
(NewThreadScheduler.Default, 2, 300, 11, -00:00:07.2067206)
(NewThreadScheduler.Default, 3, 200, 11, -00:00:07.4067406)
(NewThreadScheduler.Default, 4, 100, 11, -00:00:07.5067506)
(TaskPoolScheduler.Default, 0, 500, 12, -00:00:00.5020502)
(TaskPoolScheduler.Default, 1, 400, 12, -00:00:00.9020902)
(TaskPoolScheduler.Default, 2, 300, 12, -00:00:01.2021202)
(TaskPoolScheduler.Default, 3, 200, 12, -00:00:01.4021402)
(TaskPoolScheduler.Default, 4, 100, 12, -00:00:01.5021502)
(Scheduler.Default, 0, 500, 13, -00:00:00.5020502)
(Scheduler.Default, 1, 400, 13, -00:00:00.9020902)
(Scheduler.Default, 2, 300, 13, -00:00:01.2021202)
(Scheduler.Default, 3, 200, 13, -00:00:01.4021402)
(Scheduler.Default, 4, 100, 13, -00:00:01.5021502)
(Scheduler.Immediate, 0, 500, 5, -00:00:00.5020502)
(Scheduler.Immediate, 1, 400, 5, -00:00:00.9040904)
(Scheduler.Immediate, 2, 300, 5, -00:00:01.2041204)
(Scheduler.Immediate, 3, 200, 5, -00:00:01.4041404)
(Scheduler.Immediate, 4, 100, 5, -00:00:01.5041504)

请注意,即使我明确使用 ObserveOn() 来指定用于通知的调度程序,但每个 OnNext 调用似乎都在等待上一个调用。

我希望除 Scheduler.Immediate 之外的所有通知都能并行运行。

有人知道我做错了什么吗?

最佳答案

这是设计使然。 Rx 的主要契约之一是所有通知都必须序列化。

参见 Rx Design Guidelines 中的 §§4.2、6.7 .

Observables 表示 Rx 中的并发性,因此要有重叠的通知需要两个或更多的 observables。通知不会在同一个观察者中重叠,但它们会针对每个观察者重叠。

例如,如果您需要同时执行两个方法(观察者),那么您需要定义两个可观察对象。

从技术上讲,并发所需的是观察者(订阅)而不是可观察对象;因此,订阅同一个 cold observable 两次可以产生并发,这取决于 observable 使用的调度程序;然而,订阅同一个 hot observable 两次不会导致并发。 (请参阅我的博文:Hot and Cold Observables。)

ObserveOn 在通过引入并发的调度程序时引入并发。但它如何在不违反 §6.7 契约(Contract)的情况下做到这一点?好吧,它将 observable 分成两个 observable:before operator 和after operator!或者,您可以将其视为两个订阅或观察者:之前之后before 观察者是 ObserveOn 提供的内部观察者。 after 观察者是您的观察者,或者是查询中下一个运算符提供的观察者。

无论您如何看待,之前 可观察对象中的通知可以与之后 可观察对象中的通知同时发生。但是 after 观察者只会在 after 可观察者的上下文中接收序列化通知。

关于c# - Observable.ObserveOn() 似乎没有效果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27549737/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com