gpt4 book ai didi

c# - 观察者能否使用 Rx 安全地监听多个可观察对象?

转载 作者:太空狗 更新时间:2023-10-30 00:44:11 25 4
gpt4 key购买 nike

我正在尝试了解如何使用 Rx 将多个可观察事件流式传输到一组事件中。但是当我运行下面的代码时出现异常。那么这是否意味着多个观察者总是因为违反Rx语法而容易出现异常呢?我的意思是,如果这些多个观察者中的两个偶然同时产生一个事件(任何两个可观察者都有一定的概率同时产生),它应该给出一个异常(exception)。

DateTimeOffset start;
object sync = new object();
var subject = new Subject<long>();
var observer = Observer.Create<long>(c =>
{
lock (sync)
{
Console.WriteLine(c);
}
})
;

var observable1 = Observable.Interval(TimeSpan.FromSeconds(2));
var observable2 = Observable.Interval(TimeSpan.FromSeconds(5));
var observable3 = Observable.Never<long>().Timeout
(start = DateTimeOffset.Now.AddSeconds(15),
(new long[] { 1 }).ToObservable());
var observable4 = Observable.Never<long>().Timeout(start);
observable1.Subscribe(observer);
observable2.Subscribe(observer);
observable3.Subscribe(observer);
observable4.Subscribe(observer);
Thread.Sleep(20000);

感谢 Gideon 的解释。这是我得到的异常(exception)。你是对的,这是一个超时异常。这是一个编码错误。谢谢。

System.TimeoutException: The operation has timed out.
at System.Reactive.Observer.<Create>b__8[T](Exception e)
at System.Reactive.AnonymousObserver`1.Error(Exception exception)
at System.Reactive.AbstractObserver`1.OnError(Exception error)
at System.Reactive.Subjects.Subject`1.OnError(Exception error)
at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
at System.Reactive.AbstractObserver`1.OnError(Exception error)
at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e
xception)
at System.Reactive.AbstractObserver`1.OnError(Exception error)
at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<>c__DisplayClass28
e.<Throw>b__28b()
at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
at System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState sta
te, Func`3 action)
at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<Throw>b__28a(IObse
rver`1 observer)
at System.Reactive.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0()

at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore()
at System.Reactive.Concurrency.ScheduledItem`1.Invoke()
at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run()
at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
state, TimeSpan dueTime, Func`3 action)
at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState
state, Func`3 action)
at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio
n action)
at System.Reactive.AnonymousObservable`1.Subscribe(IObserver`1 observer)
at System.Reactive.Linq.Observable.<>c__DisplayClass543`1.<>c__DisplayClass54
5.<Timeout>b__53f()
at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action
action)
at System.Reactive.Concurrency.ThreadPoolScheduler.<>c__DisplayClass8`1.<Sche
dule>b__6(Object _)
at System.Threading._TimerCallback.TimerCallback_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, C
ontextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading._TimerCallback.PerformTimerCallback(Object state)

最佳答案

是的,一个观察者可以监听多个可观察对象。最好的例子就是 Merge 运算符。内置运算符都将遵循 RX 语法,并且通常会在不遵循 RX 语法的源上强制执行。

您从 Observer.Create 获得的 IObserver 就是这样一种情况。一旦调用 OnError 或 OnCompleted,它将忽略以后对 OnNext 的任何调用。这确实意味着使用同一个观察者订阅一个可观察对象,然后在第一个可观察对象之后订阅另一个可观察对象将不起作用,因为来自第一个可观察对象的终止消息将导致观察者忽略来自第二个可观察对象的消息。为了解决这个问题,MergeConcatOnErrorResumeNext(以及其他)等运算符在内部使用多个观察者并且不传递完成消息(OnError和/或 OnCompleted,具体取决于运算符的语义)从除最后一个观察者之外的任何观察者到外部观察者。

你没有提到你得到了什么异常,但我猜这是来自 observable4 的超时错误。如果您不提供另一个用于超时的可观察对象,则会调用观察者的 OnError,并为 Subscribe 调用默认的 OnError 和不采用错误处理程序的 Observer.Create 重载只是抛出异常。

虽然这显然是示例/测试代码,但我确实想指出,即使您不再将消息传递给 OnNext,所有其他可观察对象在此异常后仍会继续运行。要么使用 Merge 为您跟踪它,要么跟踪描述中的所有一次性用品,并在收到完成消息时自行处理它们。 CompositeDisposable(在 System.Reactive.Disposables 中)对此很有帮助。

关于c# - 观察者能否使用 Rx 安全地监听多个可观察对象?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8493291/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com