gpt4 book ai didi

c# - 并行处理 Rx 事件

转载 作者:行者123 更新时间:2023-12-05 00:03:00 26 4
gpt4 key购买 nike

如果我说

void ChangeQuote(string symbol, double bid, double ask)
{
}

string[] TechETF = {"AAPL", "MSFT"};
var quotesSubscription = quotesObservable.Where(quote => (TechETF.Contains(quote.Symbol)));

quotesSubscription.Subscribe(quote => this.ChangeQuote(quote.Symbol, quote.Bid, quote.Ask));

如果底层事件是异步触发的,这段代码是否在 subscribe() 中?还在他们自己的线程(线程池)上异步处理这些回调?或者这段代码是否序列化(阻止)这些回调的处理?

最佳答案

Rx 总是序列化调用。这是行为契约的一部分。

这里有一个例子来说明情况确实如此。

从两个计时器开始:

var timers = new []
{
new System.Timers.Timer()
{
Interval = 200.0,
AutoReset = true,
Enabled = true,
},
new System.Timers.Timer()
{
Interval = 250.0,
AutoReset = true,
Enabled = true,
},
};

现在从计时器构建两个可观察对象:
var observables = new []
{
Observable.FromEventPattern
<System.Timers.ElapsedEventHandler, System.Timers.ElapsedEventArgs>
(h => timers[0].Elapsed += h, h => timers[0].Elapsed -= h),
Observable.FromEventPattern
<System.Timers.ElapsedEventHandler, System.Timers.ElapsedEventArgs>
(h => timers[1].Elapsed += h, h => timers[1].Elapsed -= h),
};

然后将它们合并为一个可观察对象:
var observable = observables.Merge();

最后订阅看看会发生什么:
observable
.Subscribe(ep =>
{
Console.WriteLine("Start: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(2000);
Console.WriteLine("End: " + Thread.CurrentThread.ManagedThreadId);
});

我得到的结果是这样的:
Start: 15
End: 15
Start: 11
End: 11
Start: 14
End: 14
Start: 10
End: 10
Start: 4
End: 4
Start: 13
End: 13
Start: 12
End: 12
Start: 3
End: 3

因此,即使我的计时器彼此异步触发,Rx 也确保单个订阅始终按顺序处理每个值,即使它们进入不同的线程。

关于c# - 并行处理 Rx 事件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31971952/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com