- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
基于阅读这个问题:What's the difference between SubscribeOn and ObserveOnObserveOn
设置代码在 Subscribe
处理程序中的位置被执行:stream.Subscribe(_ => { // this code here });
SubscribeOn
方法设置在哪个线程上完成流的设置。
我了解到,如果未明确设置这些,则使用 TaskPool。
现在我的问题是,假设我做这样的事情:
Observable.Interval(new Timespan(0, 0, 1))
.Where(t => predicate(t))
.SelectMany(t => lots_of(t))
.ObserveOnDispatcher()
.Subscribe(t => some_action(t));
Where
predicate
和
SelectMany
lots_of
在哪里执行,假设 0x251812431431 正在执行调度程序?
最佳答案
有很多关于 SubscribeOn
和 ObserveOn
的误导性信息。
概括
SubscribeOn
拦截调用的 IObservable<T>
单方法,该方法是Subscribe
,以及由Dispose
返回的IDisposable
handle 调用Subscribe
。 ObserveOn
截取调用的 IObserver<T>
的方法,这是OnNext
,OnCompleted
&OnError
。 ObserveOn sets where the code in the Subscribe handler is executed:
OnNext
处理程序。请记住,的
Subscribe
的
IObservable
方法接受一个
IObserver
有
OnNext
,
OnCompleted
和
OnError
方法,但扩展方法提供了接受lambda表达式, build
IObserver
实现您的方便过载。
Subscribe
时调用的 observable 中的代码。这样,上面的描述更接近于
SubscribeOn
的目的。
SubscribeOn
导致 observable 的
Subscribe
方法在指定的调度程序或上下文上异步执行。当您不想从正在运行的任何线程中对 observable 调用
Subscribe
方法时使用它 - 通常是因为它可以长时间运行并且您不想阻塞调用线程。
Subscribe
时,您正在调用一个可能是一长串 observable 的一部分的 observable。这只是
SubscribeOn
应用于它所影响的可观察量。现在可能会出现这样的情况,即链中的所有 observable 都将立即在同一个线程上订阅——但并非必须如此。以
Concat
为例——它只在前一个流完成后订阅每个连续的流,通常这将发生在前一个流调用
OnCompleted
的任何线程上。
SubscribeOn
位于您对
Subscribe
的调用和您订阅的可观察对象之间,拦截调用并使其异步。
Subscribe
返回一个
IDisposable
句柄,用于取消订阅。
SubscribeOn
确保在提供的调度程序上调度对
Dispose
的调用。
SubscribeOn
的作用时,一个常见的混淆点是
Subscribe
的 observable 处理程序很可能会调用
OnNext
、 0x251812231343214141 上的 0x2518122313432141 线程。但是,其目的不是影响这些调用。在
OnCompleted
方法返回之前完成流的情况并不少见。例如,
OnError
就是这样做的。让我们来看看。
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
Subscribe
异步运行它。我们将监视
Observable.Return
observable 和
SubscribeOn
observable:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.SubscribeOn(Scheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 SubscribeOn: Observable obtained on Thread: 1
04 SubscribeOn: Subscribed to on Thread: 1
05 SubscribeOn: Subscription completed.
06 Subscribe returned
07 Return: Subscribed to on Thread: 2
08 Return: OnNext(1) on Thread: 2
09 SubscribeOn: OnNext(1) on Thread: 2
10 Return: OnCompleted() on Thread: 2
11 SubscribeOn: OnCompleted() on Thread: 2
12 Return: Subscription completed.
Return
observable。我们只是在这里得到
SubscribeOn
,还没有订阅。
Return
observable。
IObservable
的
SubscribeOn
方法。
Subscribe
方法异步完成...
SubscribeOn
将默认调度程序上的调用安排到
Subscribe
。这是在线程 2 上接收的。
SubscribeOn
所做的那样,它在
Return
线程上调用
Return
......
OnNext
现在只是一个通过。
Subscribe
相同
SubscribeOn
订阅处理程序完成。
OnCompleted
的目的和效果!
Return
作为该调用传递的
SubscribeOn
方法拦截到一个不同的线程,然后
SubscribeOn
做相同的工作,但对于
Subscribe
,
ObserveOn
和
OnNext
电话。
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Calling from Thread: 1
Return: Observable obtained on Thread: 1
Return: Subscribed to on Thread: 1
Return: OnNext(1) on Thread: 1
Return: OnCompleted() on Thread: 1
Return: Subscription completed.
Subscribe returned
OnCompleted
:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Return(1).Spy("Return");
source.ObserveOn(Scheduler.Default).Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
01 Calling from Thread: 1
02 Return: Observable obtained on Thread: 1
03 ObserveOn: Observable obtained on Thread: 1
04 ObserveOn: Subscribed to on Thread: 1
05 Return: Subscribed to on Thread: 1
06 Return: OnNext(1) on Thread: 1
07 ObserveOn: OnNext(1) on Thread: 2
08 Return: OnCompleted() on Thread: 1
09 Return: Subscription completed.
10 ObserveOn: Subscription completed.
11 Subscribe returned
12 ObserveOn: OnCompleted() on Thread: 2
OnError
observable。我们只是在这里得到
ObserveOn
,还没有订阅。
Return
observable 也在调用线程上进行评估。
IObservable
observable...
ObserveOn
observable。
ObserveOn
在其
Return
处理程序中调用
Return
。
OnNext
的效果。 我们可以看到
Subscribe
在线程 2 上异步调度。
ObserveOn
在线程 1 上调用
OnNext
...
Return
的订阅处理程序完成...
OnCompleted
的订阅处理程序...
Return
已经将
ObserveOn
的
ObserveOn
调用到线程 2。这可能在 09-11 期间的任何时间发生,因为它是异步运行的。碰巧它现在终于被调用了。
Return
到一个长时间运行的 observable 并希望尽快离开调度程序线程时,您最常会在 GUI 中看到
OnCompleted
使用 - 也许是因为您知道它是那些在订阅中完成所有工作的 observable 之一处理程序。将它应用在 observable 链的末尾,因为这是您订阅时调用的第一个 observable。
SubscribeOn
、
Subscribe
和
ObserveOn
调用被编码回调度程序线程时,您将最常看到
OnNext
在 GUI 中使用。将它应用在可观察链的末尾以尽可能晚地转换回来。
OnCompleted
不会对
OnError
和
ObserveOnDispatcher
执行的线程产生任何影响 - 这一切都取决于调用它们的线程流!流的订阅处理程序将在调用线程上被调用,但是在不知道
Where
是如何实现的情况下,不可能说
SelectMany
和
Where
将在哪里运行。
SelectMany
。
stream
在
Observable.Return
处理程序中完成其流。这不是非典型的,但流比
Return
处理程序存活时间更长的情况同样常见。以
Subscribe
为例:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.Subscribe();
Console.WriteLine("Subscribe returned");
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Subscribe
和
Observable.Timer
稍后在不同的线程上被调用。
OnNext
或
OnCompleted
的组合不会对哪个线程或调度程序
SubscribeOn
选择调用 0x25181223134312411 和 481312411 上的 0x25181223134312411 产生任何影响
ObserveOn
来确定
Timer
线程:
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.SubscribeOn(NewThreadScheduler.Default).Spy("SubscribeOn").Subscribe();
Console.WriteLine("Subscribe returned");
OnNext
以防止在
OnCompleted
发生与
SubscribeOn
相同的线程池线程的情况下发生混淆)
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
SubscribeOn: Observable obtained on Thread: 1
SubscribeOn: Subscribed to on Thread: 1
SubscribeOn: Subscription completed.
Subscribe returned
Timer: Subscribed to on Thread: 2
Timer: Subscription completed.
Timer: OnNext(0) on Thread: 3
SubscribeOn: OnNext(0) on Thread: 3
Timer: OnCompleted() on Thread: 3
SubscribeOn: OnCompleted() on Thread: 3
Subscribe
调用后返回,但是
NewThreadScheduler
订阅获得了自己的线程 (2),但是
Timer
和 0x2518131431 上的 0x2518131431 调用了。
SubscribeOn
,让我们将代码更改为(对于以下代码,请使用 nuget 包 rx-wpf):
var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Subscribe
- 这就像
Timer
,除了它指定我们应该使用
OnNext
的任何线程 0x2518431421 评估
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Timer: OnNext(0) on Thread: 2
ObserveOn: OnNext(0) on Thread: 1
Timer: OnCompleted() on Thread: 2
ObserveOn: OnCompleted() on Thread: 1
OnCompleted
仍在其选择的线程(2)上调用
ObserveOn
和
ObserveOnDispatcher
- 但 0x25181314131313131313
ObserveOn
),您会看到
DispatcherScheduler
会阻塞(此代码在 LINQPad 主方法中效果最佳):
var dispatcher = Dispatcher.CurrentDispatcher;
Console.WriteLine("Calling from Thread: " + Thread.CurrentThread.ManagedThreadId);
var source = Observable.Timer(TimeSpan.FromSeconds(1)).Spy("Timer");
source.ObserveOnDispatcher().Spy("ObserveOn").Subscribe();
Console.WriteLine("Subscribe returned");
Console.WriteLine("Blocking the dispatcher");
Thread.Sleep(2000);
Console.WriteLine("Unblocked");
Calling from Thread: 1
Timer: Observable obtained on Thread: 1
ObserveOn: Observable obtained on Thread: 1
ObserveOn: Subscribed to on Thread: 1
Timer: Subscribed to on Thread: 1
Timer: Subscription completed.
ObserveOn: Subscription completed.
Subscribe returned
Blocking the dispatcher
Timer: OnNext(0) on Thread: 2
Timer: OnCompleted() on Thread: 2
Unblocked
ObserveOn: OnNext(0) on Thread: 1
ObserveOn: OnCompleted() on Thread: 1
ObserveOnDispatcher
的调用只有在
Timer
运行后才能退出。
OnNext
,
OnCompleted
并将特定的调度程序传递给接受它们的运算符(operator)改变这一点。
ObserveOnDispatcher
和
Thread.Sleep
是
decorators,它们将观察者的表面区域和 observables 调用包装到 marshal。希望这些例子已经说明了这一点。
关于c# - ObserveOn 和 SubscribeOn - 工作完成的地方,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/20451939/
我有一个 View 模型,它在与 UI 线程分开的线程上触发各种对象的可观察对象。同时,有订阅这些可观察对象的 View 代码,需要更新 UI 线程上的控件。 就编码风格而言,使用 ObserveOn
我是 RX 的新手,正在尝试一些示例,关于为什么订阅中的 Console.writeLine 没有被调用有什么想法吗? var obs = Observable.Create(i => {
在 Scala 中,我编写了两个 MongoDB 可观察对象,并在传递自定义执行上下文时调用了 observeOn。在第一个可观察对象上调用 observeOn,但自定义执行上下文不会传播到第二个可观
我正在尝试使用 Rx 并行处理项目。看来我不能告诉 Rx 并行运行我的观察者的 OnNext() 。下面是测试代码来演示 [Test] public void ObservableObserveOnN
我刚刚发现了 SubscribeOn,这让我想知道我是否应该使用它而不是 ObserveOn。谷歌带了我here和 here ,但两者都没有帮助我理解差异:它看起来非常微妙。 (在我的上下文中,我在非
基于阅读这个问题:What's the difference between SubscribeOn and ObserveOnObserveOn 设置代码在 Subscribe 处理程序中的位置被执
我有以下代码 Observable.just(10) .doOnTerminate(() -> Log.d("LOG", "ON TERMINATE"))
我正在尝试使用 .net Observable 类实现一个简单的观察者模式。我有这样的代码: Observable.FromEventPattern( Instance.User, "
我对在可观察对象上调用 subscribeOn 和 observeOn 方法的顺序有点困惑。我读了几篇文章,一个人说没关系,只是在他的例子中使用了东西,其他人说这很重要。所以这是我的问题: 例如: s
我正在开发华为 HarmonyOS 应用程序,我正在尝试使用 RxJava 为后台任务实现一个基类。我的问题是我不知道如何在主线程上观察。 在常规 Android 上,我会使用 AndroidSche
Scheduling and Threading Intro to Rx 部分说 the use of SubscribeOn and ObserveOn should only be invoked
也许我只是真正了解 subscribeOn 和 observeOn 的内部工作原理,但我最近遇到了一些非常奇怪的事情。我的印象是,subscribeOn 决定了调度程序最初开始处理的位置(特别是当我们
请看下面的代码: var obs = Observable.Start(() => LongRunningMethodToRetrieveData()); obs.Subscribe(x => M
基本上我的 Android 应用程序有一些元数据需要在不同的情况下报告给后端服务器: data class SearchMetaData( val searchId: String?,
我已经声明: Subject mBehaviorSubject = BehaviorSubject.createDefault("default").toSerialized(); 似乎工作正常。但我
有人可以帮助解释为什么当我“阻塞并继续”观察者的 onNext 序列订阅了一个具有时间可观察序列的缓冲区时,Scheduler.NewThread 不再适用吗? 例如: 如果我通过缓冲一个数字序列 v
我遇到了一个问题,我的可观察对象在 IO 线程上订阅并在 android 主 (UI) 线程上观察,但 doFinally 运算符在 IO 线程上运行,它需要在 UI 线程上运行。 用例几乎和这个me
我有一个调用 Web 服务的方法,我认为它在 IO 线程上运行,直到服务停止并且 UI 卡住。 所以我开始了一些简单的测试来检查线程 implementation 'io.reactivex.rxja
当我使用 RxAndroid 和 .observeOn(AndroidSchedulers.mainThread()),并使用 Android Studio 在模拟器上运行测试时,整个测试运行崩溃:
我有一个像这样的简单流: Observable.error(Exception()).startWith(1).subscribe { println("Item is $it")
我是一名优秀的程序员,十分优秀!