- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我正在尝试使用 Rx 并行处理项目。看来我不能告诉 Rx 并行运行我的观察者的 OnNext() 。下面是测试代码来演示
[Test]
public void ObservableObserveOnNewThreadRunsInParallel()
{
Console.WriteLine("Starting thread: {0}", Thread.CurrentThread.ManagedThreadId);
// store items as they are output
var list = new List<Tuple<string, int, int, int, TimeSpan>>();
// used to wait until a sequences are complete
var ev = new AutoResetEvent(false);
// try these schedulers
var schedulers = new[] {
Tuple.Create("ThreadPoolScheduler.Instance", (IScheduler)ThreadPoolScheduler.Instance),
Tuple.Create("NewThreadScheduler.Default", (IScheduler)NewThreadScheduler.Default),
Tuple.Create("TaskPoolScheduler.Default", (IScheduler)TaskPoolScheduler.Default),
Tuple.Create("Scheduler.Default", (IScheduler)Scheduler.Default),
Tuple.Create("Scheduler.Immediate", (IScheduler)Scheduler.Immediate),
};
// try each scheduler
foreach (var schedulerTuple in
schedulers) {
// emit tuples <i, delay> where delay decreases with each new tuple
// such that output timing is expected to be reversed
var observable =
Observable.Range(0, 5)
.Select(i => Tuple.Create((int)i, (int)(500 - i * 100)))
.Take(5);
var dt = DateTime.Now;
Tuple<string, IScheduler> scheduler = schedulerTuple;
observable
// specify the scheduler to use
.ObserveOn(schedulerTuple.Item2)
.Subscribe(
v => {
// emulate some work (first items take longer than last items)
Thread.Sleep(v.Item2);
// record when the item is done recording
lock (list)
list.Add(
Tuple.Create(
scheduler.Item1,
v.Item1,
v.Item2,
Thread.CurrentThread.ManagedThreadId,
dt - DateTime.Now));
},
// let the test go on
() => ev.Set());
// wait until the end of the sequence
ev.WaitOne();
}
// print observed order
foreach (var i in list) {
Console.WriteLine(i);
}
}
输出:
Starting thread: 5
(ThreadPoolScheduler.Instance, 0, 500, 9, -00:00:04.2514251)
(ThreadPoolScheduler.Instance, 1, 400, 9, -00:00:04.6524652)
(ThreadPoolScheduler.Instance, 2, 300, 9, -00:00:04.9524952)
(ThreadPoolScheduler.Instance, 3, 200, 9, -00:00:05.1525152)
(ThreadPoolScheduler.Instance, 4, 100, 9, -00:00:05.2525252)
(NewThreadScheduler.Default, 0, 500, 11, -00:00:06.5066506)
(NewThreadScheduler.Default, 1, 400, 11, -00:00:06.9066906)
(NewThreadScheduler.Default, 2, 300, 11, -00:00:07.2067206)
(NewThreadScheduler.Default, 3, 200, 11, -00:00:07.4067406)
(NewThreadScheduler.Default, 4, 100, 11, -00:00:07.5067506)
(TaskPoolScheduler.Default, 0, 500, 12, -00:00:00.5020502)
(TaskPoolScheduler.Default, 1, 400, 12, -00:00:00.9020902)
(TaskPoolScheduler.Default, 2, 300, 12, -00:00:01.2021202)
(TaskPoolScheduler.Default, 3, 200, 12, -00:00:01.4021402)
(TaskPoolScheduler.Default, 4, 100, 12, -00:00:01.5021502)
(Scheduler.Default, 0, 500, 13, -00:00:00.5020502)
(Scheduler.Default, 1, 400, 13, -00:00:00.9020902)
(Scheduler.Default, 2, 300, 13, -00:00:01.2021202)
(Scheduler.Default, 3, 200, 13, -00:00:01.4021402)
(Scheduler.Default, 4, 100, 13, -00:00:01.5021502)
(Scheduler.Immediate, 0, 500, 5, -00:00:00.5020502)
(Scheduler.Immediate, 1, 400, 5, -00:00:00.9040904)
(Scheduler.Immediate, 2, 300, 5, -00:00:01.2041204)
(Scheduler.Immediate, 3, 200, 5, -00:00:01.4041404)
(Scheduler.Immediate, 4, 100, 5, -00:00:01.5041504)
请注意,即使我明确使用 ObserveOn()
来指定用于通知的调度程序,但每个 OnNext 调用似乎都在等待上一个调用。
我希望除 Scheduler.Immediate 之外的所有通知都能并行运行。
有人知道我做错了什么吗?
最佳答案
这是设计使然。 Rx 的主要契约之一是所有通知都必须序列化。
参见 Rx Design Guidelines 中的 §§4.2、6.7 .
Observables 表示 Rx 中的并发性,因此要有重叠的通知需要两个或更多的 observables。通知不会在同一个观察者中重叠,但它们会针对每个观察者重叠。
例如,如果您需要同时执行两个方法(观察者),那么您需要定义两个可观察对象。
从技术上讲,并发所需的是观察者(订阅)而不是可观察对象;因此,订阅同一个 cold observable 两次可以产生并发,这取决于 observable 使用的调度程序;然而,订阅同一个 hot observable 两次不会导致并发。 (请参阅我的博文:Hot and Cold Observables。)
ObserveOn 在通过引入并发的调度程序时引入并发。但它如何在不违反 §6.7 契约(Contract)的情况下做到这一点?好吧,它将 observable 分成两个 observable:before operator 和after operator!或者,您可以将其视为两个订阅或观察者:之前 和之后。 before 观察者是 ObserveOn 提供的内部观察者。 after 观察者是您的观察者,或者是查询中下一个运算符提供的观察者。
无论您如何看待,之前 可观察对象中的通知可以与之后 可观察对象中的通知同时发生。但是 after 观察者只会在 after 可观察者的上下文中接收序列化通知。
关于c# - Observable.ObserveOn() 似乎没有效果,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27549737/
我有一个 View 模型,它在与 UI 线程分开的线程上触发各种对象的可观察对象。同时,有订阅这些可观察对象的 View 代码,需要更新 UI 线程上的控件。 就编码风格而言,使用 ObserveOn
我是 RX 的新手,正在尝试一些示例,关于为什么订阅中的 Console.writeLine 没有被调用有什么想法吗? var obs = Observable.Create(i => {
在 Scala 中,我编写了两个 MongoDB 可观察对象,并在传递自定义执行上下文时调用了 observeOn。在第一个可观察对象上调用 observeOn,但自定义执行上下文不会传播到第二个可观
我正在尝试使用 Rx 并行处理项目。看来我不能告诉 Rx 并行运行我的观察者的 OnNext() 。下面是测试代码来演示 [Test] public void ObservableObserveOnN
我刚刚发现了 SubscribeOn,这让我想知道我是否应该使用它而不是 ObserveOn。谷歌带了我here和 here ,但两者都没有帮助我理解差异:它看起来非常微妙。 (在我的上下文中,我在非
基于阅读这个问题:What's the difference between SubscribeOn and ObserveOnObserveOn 设置代码在 Subscribe 处理程序中的位置被执
我有以下代码 Observable.just(10) .doOnTerminate(() -> Log.d("LOG", "ON TERMINATE"))
我正在尝试使用 .net Observable 类实现一个简单的观察者模式。我有这样的代码: Observable.FromEventPattern( Instance.User, "
我对在可观察对象上调用 subscribeOn 和 observeOn 方法的顺序有点困惑。我读了几篇文章,一个人说没关系,只是在他的例子中使用了东西,其他人说这很重要。所以这是我的问题: 例如: s
我正在开发华为 HarmonyOS 应用程序,我正在尝试使用 RxJava 为后台任务实现一个基类。我的问题是我不知道如何在主线程上观察。 在常规 Android 上,我会使用 AndroidSche
Scheduling and Threading Intro to Rx 部分说 the use of SubscribeOn and ObserveOn should only be invoked
也许我只是真正了解 subscribeOn 和 observeOn 的内部工作原理,但我最近遇到了一些非常奇怪的事情。我的印象是,subscribeOn 决定了调度程序最初开始处理的位置(特别是当我们
请看下面的代码: var obs = Observable.Start(() => LongRunningMethodToRetrieveData()); obs.Subscribe(x => M
基本上我的 Android 应用程序有一些元数据需要在不同的情况下报告给后端服务器: data class SearchMetaData( val searchId: String?,
我已经声明: Subject mBehaviorSubject = BehaviorSubject.createDefault("default").toSerialized(); 似乎工作正常。但我
有人可以帮助解释为什么当我“阻塞并继续”观察者的 onNext 序列订阅了一个具有时间可观察序列的缓冲区时,Scheduler.NewThread 不再适用吗? 例如: 如果我通过缓冲一个数字序列 v
我遇到了一个问题,我的可观察对象在 IO 线程上订阅并在 android 主 (UI) 线程上观察,但 doFinally 运算符在 IO 线程上运行,它需要在 UI 线程上运行。 用例几乎和这个me
我有一个调用 Web 服务的方法,我认为它在 IO 线程上运行,直到服务停止并且 UI 卡住。 所以我开始了一些简单的测试来检查线程 implementation 'io.reactivex.rxja
当我使用 RxAndroid 和 .observeOn(AndroidSchedulers.mainThread()),并使用 Android Studio 在模拟器上运行测试时,整个测试运行崩溃:
我有一个像这样的简单流: Observable.error(Exception()).startWith(1).subscribe { println("Item is $it")
我是一名优秀的程序员,十分优秀!