gpt4 book ai didi

c# - 如果中间有 linq 方法,rx 处理异常订阅

转载 作者:太空宇宙 更新时间:2023-11-03 18:08:14 24 4
gpt4 key购买 nike

当我订阅一个有时会抛出异常的方法时,我会得到两种不同的行为。如果我在中间连接 LINQ 方法,订阅就会被释放,否则不会,为什么?

void main(){
var numbersSubject=new Subject<int>();

numbersSubject.subscribe(throwMethod); // 1,2,3,4,6,7,8,9,10
// numbersSubject.select(num=>num).subscribe(throwMethod); // 1,2,3,4

for(int i=0;i<10;i++)
{
try{
numbersSubject.OnNext(i);
}catch{}
}
}

void throwMethod(int num)
{
if(num==5)
throw new Exception();
Console.writeLine(i);
}

最佳答案

所以,澄清一下:

当版本没有 LINQ 运算符运行时,我们看到:

0,1,2,3,4,6,7,8,9

带有 LINQ 运算符的版本运行时,我们看到:

0,1,2,3,4

值得注意的是,当你订阅一个好的和坏的行为观察者到第二个版本时,你会得到如下所示的输出(注释中的输出),如下所示:

numbersSubject.Subscribe(throwMethod);   
var source = numbersSubject.Select(num=>num);
source.Subscribe(Console.WriteLine); // 0,1,2,3,4,5,6,7,8,9
source.Subscribe(throwMethod); // 0,1,2,3,4

请注意,“好的”观察者会获取所有事件。

原因是内置的操作符有一个保护层来处理坏观察者的订阅

从Rx源码我们看到:

适当的资源清理需要保护管道免受流氓观察者的侵害。考虑以下示例:

var xs  = Observable.Interval(TimeSpan.FromSeconds(1));
var ys = <some random sequence>;
var res = xs.CombineLatest(ys, (x, y) => x + y);

上面查询的弹珠图如下所示:

xs  -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
| | | | | | | | |
ys --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
| | | | | | | | | | | | | |
v v v v v v v v v v v v v v
res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
|
@#&

请注意 Rx 的自由线程特性,其中结果序列上的消息由 CombineLatest 的两个输入序列中的任何一个生成。

现在假设在 res 的观察者的 OnNext 回调中发生异常,在上面用 @#& 标记的指定点。回调在 ys 的上下文中运行,因此异常将关闭 ys 的调度程序线程。这本身就是一个问题(可以通过 IScheduler 上的 Catch 运算符来缓解),但请注意生成 xs 的计时器是如何保持的活着。

安全保护代码确保在用户回调抛出时处理获取的资源。


以上是在名为 AutoDetachObserver 的内部类中实现的,并由大多数内部运算符使用的 SafeObserver 包装。

所有这一切都是用 try...finally 异常处理程序包装每个 OnXXX 调用,异常处理程序的 finally block 在发生错误时处理订阅 - 例如OnNext 看起来像:

        var __noError = false;
try
{
observer.OnNext(value);
__noError = true;
}
finally
{
if (!__noError)
Dispose();
}

Subject,出于性能原因,没有这一层保护。添加它(并防止其他滥用)的一种快速方法是将 Synchronize() 运算符添加到主题。例如:

var numbersSubject = new Subject<int>();
var source = numbersSubject.Synchronize();
source.Subscribe(throwMethod);

会输出

0,1,2,3,4,6,7,8,9

但是添加Synchronize如下所示:

var numbersSubject = new Subject<int>();
var source = numbersSubject.Synchronize();
source.Synchronize().Subscribe(throwMethod);

会输出

0,1,2,3,4

与其他内置运算符一致(加上您使用 Observable.Create 实现的任何运算符)。

(EDIT) 处理观察者异常的注意事项

在评论的提示下,这里有一些关于处理观察者抛出的异常的附加说明。

当 OnNext 处理程序中出现异常时,它位于堆栈中 Rx 代码的下方,因此无法负责任地将其“返回”给用户。此时用户必须被视为已死亡。我们能合理地做的就是处理订阅并清理由于订阅而产生的资源。这是基于推送的代码的结果。与 IEnumerable 对比,后者可以向客户端代码抛出异常,因为它是客户端进行拉动。

请注意,某些运算符包含用户提供的逻辑(如 Where 运算符中的谓词表达式),通过 OnError 传播错误到观察者的 channel ,但是一旦观察者通过在它自己的代码中抛出异常而死亡 - 就是这样。它不会再通过任何 OnXXX 方法调用。

从大约一半的地方开始,还有更多内容 this epic Bart de Smet post .

关于c# - 如果中间有 linq 方法,rx 处理异常订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22072385/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com