- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在尝试了解如何使用 Rx 将多个可观察事件流式传输到一组事件中。但是当我运行下面的代码时出现异常。那么这是否意味着多个观察者总是因为违反Rx语法而容易出现异常呢?我的意思是,如果这些多个观察者中的两个偶然同时产生一个事件(任何两个可观察者都有一定的概率同时产生),它应该给出一个异常(exception)。
DateTimeOffset start;
object sync = new object();
var subject = new Subject<long>();
var observer = Observer.Create<long>(c =>
{
lock (sync)
{
Console.WriteLine(c);
}
})
;
var observable1 = Observable.Interval(TimeSpan.FromSeconds(2));
var observable2 = Observable.Interval(TimeSpan.FromSeconds(5));
var observable3 = Observable.Never<long>().Timeout
(start = DateTimeOffset.Now.AddSeconds(15),
(new long[] { 1 }).ToObservable());
var observable4 = Observable.Never<long>().Timeout(start);
observable1.Subscribe(observer);
observable2.Subscribe(observer);
observable3.Subscribe(observer);
observable4.Subscribe(observer);
Thread.Sleep(20000);
感谢 Gideon 的解释。这是我得到的异常(exception)。你是对的,这是一个超时异常。这是一个编码错误。谢谢。
System.TimeoutException: The operation has timed out.
at System.Reactive.Observer.<Create>b__8[T](Exception e)
at System.Reactive.AnonymousObserver`1.Error(Exception exception)
at System.Reactive.AbstractObserver`1.OnError(Exception error)
at System.Reactive.Subjects.Subject`1.OnError(Exception error)
at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
at System.Reactive.AbstractObserver`1.OnError(Exception error)
at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
at System.Reactive.AbstractObserver`1.OnError(Exception error)
at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<>c__DisplayClass28
e.<Throw>b__28b()
at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
at System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState sta
te, Func`3 action)
at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<Throw>b__28a(IObse
rver`1 observer)
at System.Reactive.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()
at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
at System.Reactive.Concurrency.ScheduledItem`1.Invoke()
at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run()
at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
state, TimeSpan dueTime, Func`3 action)
at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
state, Func`3 action)
at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
at System.Reactive.AnonymousObservable`1.Subscribe(IObserver`1 observer)
at System.Reactive.Linq.Observable.<>c__DisplayClass543`1.<>c__DisplayClass54
5.<Timeout>b__53f()
at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
at System.Reactive.Concurrency.ThreadPoolScheduler.<>c__DisplayClass8`1.<Sche
dule>b__6(Object _)
at System.Threading._TimerCallback.TimerCallback_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, C
ontextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading._TimerCallback.PerformTimerCallback(Object state)
最佳答案
是的,一个观察者可以监听多个可观察对象。最好的例子就是 Merge
运算符。内置运算符都将遵循 RX 语法,并且通常会在不遵循 RX 语法的源上强制执行。
您从 Observer.Create
获得的 IObserver
就是这样一种情况。一旦调用 OnError 或 OnCompleted,它将忽略以后对 OnNext 的任何调用。这确实意味着使用同一个观察者订阅一个可观察对象,然后在第一个可观察对象之后订阅另一个可观察对象将不起作用,因为来自第一个可观察对象的终止消息将导致观察者忽略来自第二个可观察对象的消息。为了解决这个问题,Merge
、Concat
和 OnErrorResumeNext
(以及其他)等运算符在内部使用多个观察者并且不传递完成消息(OnError和/或 OnCompleted,具体取决于运算符的语义)从除最后一个观察者之外的任何观察者到外部观察者。
你没有提到你得到了什么异常,但我猜这是来自 observable4
的超时错误。如果您不提供另一个用于超时的可观察对象,则会调用观察者的 OnError
,并为 Subscribe
调用默认的 OnError
和不采用错误处理程序的 Observer.Create
重载只是抛出异常。
虽然这显然是示例/测试代码,但我确实想指出,即使您不再将消息传递给 OnNext
,所有其他可观察对象在此异常后仍会继续运行。要么使用 Merge
为您跟踪它,要么跟踪描述中的所有一次性用品,并在收到完成消息时自行处理它们。 CompositeDisposable
(在 System.Reactive.Disposables
中)对此很有帮助。
关于c# - 观察者能否使用 Rx 安全地监听多个可观察对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8493291/
我用 Cocoapods 创建了一个简单的项目并安装了 RxSwift 和 RxCocoa。 我写了这段代码: import UIKit import RxSwift class ViewContro
我对 Rx 真的很陌生,只是想知道 subscribeOn 的顺序如何影响 Observable //This will not print anything Observable.just("wha
我有一个 PublishSubject 每 X 秒发出一个信号,我想只考虑 Y 秒后发出的第一个项目。 例子 observable A 每秒发出一次“滴答声” observable B 应该每 5 秒
我有以下流。 Observable.just(Unit) // execute immediately .mergeWith(tryAgainRelay) // execute again w
我可以看到 RX 适用于 Android 和 UI 事件处理。我很难看到 RX 在后端提供了什么好处。 RX Java 是为后端处理而设计的,还是这个概念太过分了? 最佳答案 实际上,RxJava 最
我有一个关于什么是更好的 RxJava 模式以保持可观察状态的问题。 为简单起见,假设我们有一个 StateManager需要跟踪系统中某个状态(假设它是一个简单的 bool 标志)并以可观察的方式公
我遇到了以下问题。我有一个从具有 isMember 属性的服务器请求的基本对象。如果该属性为 true,我想向端点 1、2 和 3 发出请求。 如果该属性为 false,我只想向端点 1 发出请求。
我正在使用 RxJava,但也许能够翻译另一个实现的答案。 我有一个 Observable 发出一系列项目,我想将它们分成 10 组。如下所示: observable .buffer(10)
有没有办法计算 RxAndroid 中流中已处理的元素数量? 我有类似的事情: Observable.fromArray(new String[]{"these", "are", "my", "val
如果我的客户端断开连接(错误)超过 10 秒,我会尝试显示一个弹出窗口。但是,当重新连接时,我也会关闭弹出窗口(true)。如果为真,我需要立即关闭弹出窗口。 我认为我需要做的是根据值(假)进行去抖,
收到 onNext() 后如何自动退订? 现在我使用这个代码: rxObservable .compose(bindToLifecycle()) // unsubscribe automaticall
我在 Ettus x310 上有一个简单的 C++ 测试程序,以前可以用,但现在不行了。我试图简单地设置单个 USRP 的两个 channel 的两个中心频率。当我尝试在第二个 channel 上设置
我有一个返回 Observable 的网络调用,我有另一个网络调用,它不是依赖于第一个 Observable 的 rx,我需要以某种方式转换这一切都与 Rx 相关。 Observable respon
我正在尝试根据 Rxjava 中的某些条件创建组列表。 以下是我的回复: { "dates":[ { "date":18, "value":
RX 6800 XT是最近很强的一款显卡,让很多的用户都很喜爱,详细还有很多没有入手的用户对他不是很了解吧,下面就带来了RX 6800 XT评测、跑分、价格、参数、图片,快来一起看看吧。 RX
我有一个重复元素列表,比如: Observable source = Observable.just("A", "B", "A", "C", "C", "A"); 我想按它们的值以及它们出现的次数对它
如果我将 RX-main 包添加到 WPF 应用程序,我会安装以下依赖包: 接收接口(interface) 接收核心 Rx-Linq Rx 平台服务 主要接收 添加 Rx-main 后,可以安装两个与
com.myapp.test.debug E/MessageQueue-JNI:rx.exceptions.OnErrorNotImplementedException 在 rx.Observable
这个问题在这里已经有了答案: RxJava operator that emits if different from last (1 个回答) 关闭 5 年前。 我有一个 bool 型 Obser
首先,我在两个 shell 中同时运行 vnstat -tr -i eth0 和 iftop。奇怪的是,vnstat 中的 rx 比 iftop 中的峰值 rx 大得多。我该如何解决这个问题? vns
我是一名优秀的程序员,十分优秀!