- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
当我订阅一个有时会抛出异常的方法时,我会得到两种不同的行为。如果我在中间连接 LINQ 方法,订阅就会被释放,否则不会,为什么?
void main(){
var numbersSubject=new Subject<int>();
numbersSubject.subscribe(throwMethod); // 1,2,3,4,6,7,8,9,10
// numbersSubject.select(num=>num).subscribe(throwMethod); // 1,2,3,4
for(int i=0;i<10;i++)
{
try{
numbersSubject.OnNext(i);
}catch{}
}
}
void throwMethod(int num)
{
if(num==5)
throw new Exception();
Console.writeLine(i);
}
最佳答案
所以,澄清一下:
当版本没有 LINQ 运算符运行时,我们看到:
0,1,2,3,4,6,7,8,9
当带有 LINQ 运算符的版本运行时,我们看到:
0,1,2,3,4
值得注意的是,当你订阅一个好的和坏的行为观察者到第二个版本时,你会得到如下所示的输出(注释中的输出),如下所示:
numbersSubject.Subscribe(throwMethod);
var source = numbersSubject.Select(num=>num);
source.Subscribe(Console.WriteLine); // 0,1,2,3,4,5,6,7,8,9
source.Subscribe(throwMethod); // 0,1,2,3,4
请注意,“好的”观察者会获取所有事件。
原因是内置的操作符有一个保护层来处理坏观察者的订阅。
从Rx源码我们看到:
适当的资源清理需要保护管道免受流氓观察者的侵害。考虑以下示例:
var xs = Observable.Interval(TimeSpan.FromSeconds(1));
var ys = <some random sequence>;
var res = xs.CombineLatest(ys, (x, y) => x + y);
上面查询的弹珠图如下所示:
xs -----0-----1-----2-----3-----4-----5-----6-----7-----8-----9---...
| | | | | | | | |
ys --------4--+--5--+-----+--2--+--1--+-----+-----+--0--+-----+---...
| | | | | | | | | | | | | |
v v v v v v v v v v v v v v
res --------4--5--6--7-----8--5--6--5--6-----7-----8--7--8-----9---...
|
@#&
请注意 Rx 的自由线程特性,其中结果序列上的消息由 CombineLatest
的两个输入序列中的任何一个生成。
现在假设在 res
的观察者的 OnNext
回调中发生异常,在上面用 @#&
标记的指定点。回调在 ys
的上下文中运行,因此异常将关闭 ys
的调度程序线程。这本身就是一个问题(可以通过 IScheduler
上的 Catch
运算符来缓解),但请注意生成 xs
的计时器是如何保持的活着。
安全保护代码确保在用户回调抛出时处理获取的资源。
以上是在名为 AutoDetachObserver
的内部类中实现的,并由大多数内部运算符使用的 SafeObserver
包装。
所有这一切都是用 try...finally
异常处理程序包装每个 OnXXX 调用,异常处理程序的 finally block 在发生错误时处理订阅 - 例如OnNext
看起来像:
var __noError = false;
try
{
observer.OnNext(value);
__noError = true;
}
finally
{
if (!__noError)
Dispose();
}
Subject
,出于性能原因,没有这一层保护。添加它(并防止其他滥用)的一种快速方法是将 Synchronize()
运算符添加到主题。例如:
var numbersSubject = new Subject<int>();
var source = numbersSubject.Synchronize();
source.Subscribe(throwMethod);
会输出
0,1,2,3,4,6,7,8,9
但是添加Synchronize
如下所示:
var numbersSubject = new Subject<int>();
var source = numbersSubject.Synchronize();
source.Synchronize().Subscribe(throwMethod);
会输出
0,1,2,3,4
与其他内置运算符一致(加上您使用 Observable.Create
实现的任何运算符)。
在评论的提示下,这里有一些关于处理观察者抛出的异常的附加说明。
当 OnNext 处理程序中出现异常时,它位于堆栈中 Rx 代码的下方,因此无法负责任地将其“返回”给用户。此时用户必须被视为已死亡。我们能合理地做的就是处理订阅并清理由于订阅而产生的资源。这是基于推送的代码的结果。与 IEnumerable 对比,后者可以向客户端代码抛出异常,因为它是客户端进行拉动。
请注意,某些运算符包含用户提供的逻辑(如 Where
运算符中的谓词表达式),将通过 OnError
传播错误到观察者的 channel ,但是一旦观察者通过在它自己的代码中抛出异常而死亡 - 就是这样。它不会再通过任何 OnXXX
方法调用。
从大约一半的地方开始,还有更多内容 this epic Bart de Smet post .
关于c# - 如果中间有 linq 方法,rx 处理异常订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22072385/
我用 Cocoapods 创建了一个简单的项目并安装了 RxSwift 和 RxCocoa。 我写了这段代码: import UIKit import RxSwift class ViewContro
我对 Rx 真的很陌生,只是想知道 subscribeOn 的顺序如何影响 Observable //This will not print anything Observable.just("wha
我有一个 PublishSubject 每 X 秒发出一个信号,我想只考虑 Y 秒后发出的第一个项目。 例子 observable A 每秒发出一次“滴答声” observable B 应该每 5 秒
我有以下流。 Observable.just(Unit) // execute immediately .mergeWith(tryAgainRelay) // execute again w
我可以看到 RX 适用于 Android 和 UI 事件处理。我很难看到 RX 在后端提供了什么好处。 RX Java 是为后端处理而设计的,还是这个概念太过分了? 最佳答案 实际上,RxJava 最
我有一个关于什么是更好的 RxJava 模式以保持可观察状态的问题。 为简单起见,假设我们有一个 StateManager需要跟踪系统中某个状态(假设它是一个简单的 bool 标志)并以可观察的方式公
我遇到了以下问题。我有一个从具有 isMember 属性的服务器请求的基本对象。如果该属性为 true,我想向端点 1、2 和 3 发出请求。 如果该属性为 false,我只想向端点 1 发出请求。
我正在使用 RxJava,但也许能够翻译另一个实现的答案。 我有一个 Observable 发出一系列项目,我想将它们分成 10 组。如下所示: observable .buffer(10)
有没有办法计算 RxAndroid 中流中已处理的元素数量? 我有类似的事情: Observable.fromArray(new String[]{"these", "are", "my", "val
如果我的客户端断开连接(错误)超过 10 秒,我会尝试显示一个弹出窗口。但是,当重新连接时,我也会关闭弹出窗口(true)。如果为真,我需要立即关闭弹出窗口。 我认为我需要做的是根据值(假)进行去抖,
收到 onNext() 后如何自动退订? 现在我使用这个代码: rxObservable .compose(bindToLifecycle()) // unsubscribe automaticall
我在 Ettus x310 上有一个简单的 C++ 测试程序,以前可以用,但现在不行了。我试图简单地设置单个 USRP 的两个 channel 的两个中心频率。当我尝试在第二个 channel 上设置
我有一个返回 Observable 的网络调用,我有另一个网络调用,它不是依赖于第一个 Observable 的 rx,我需要以某种方式转换这一切都与 Rx 相关。 Observable respon
我正在尝试根据 Rxjava 中的某些条件创建组列表。 以下是我的回复: { "dates":[ { "date":18, "value":
RX 6800 XT是最近很强的一款显卡,让很多的用户都很喜爱,详细还有很多没有入手的用户对他不是很了解吧,下面就带来了RX 6800 XT评测、跑分、价格、参数、图片,快来一起看看吧。 RX
我有一个重复元素列表,比如: Observable source = Observable.just("A", "B", "A", "C", "C", "A"); 我想按它们的值以及它们出现的次数对它
如果我将 RX-main 包添加到 WPF 应用程序,我会安装以下依赖包: 接收接口(interface) 接收核心 Rx-Linq Rx 平台服务 主要接收 添加 Rx-main 后,可以安装两个与
com.myapp.test.debug E/MessageQueue-JNI:rx.exceptions.OnErrorNotImplementedException 在 rx.Observable
这个问题在这里已经有了答案: RxJava operator that emits if different from last (1 个回答) 关闭 5 年前。 我有一个 bool 型 Obser
首先,我在两个 shell 中同时运行 vnstat -tr -i eth0 和 iftop。奇怪的是,vnstat 中的 rx 比 iftop 中的峰值 rx 大得多。我该如何解决这个问题? vns
我是一名优秀的程序员,十分优秀!