- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 rxJava/rxAndroid,但有一些非常基本的东西没有按照我的预期运行。我有一个可观察对象和两个订阅者:
Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));
Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
这是输出:
D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
现在,我知道我可以通过使用 publish().autoConnect()
来避免重复计数,但我首先尝试了解这种默认行为。每次有人订阅可观察对象时,它就会开始发出数字序列。我明白了。因此,当订阅者 1
连接时,它开始发出项目。 订阅者 2
立即连接,为什么它没有也获取值?
我是这样理解的,从可观察的角度来看:
有人订阅了我,我应该开始发布项目
[订阅者:1][要发出的项目:1,2,3]
将项目“1”发送给订阅者
[订阅者:1][要发出的项目:2,3]
有人订阅了我,完成后我会再次发出 1,2,3
[订阅者:1 和 2][要发出的项目:2,3,1,2,3]
将项目“2”发送给订阅者
[订阅者:1 和 2][要发出的项目:3,1,2,3]
将项目“3”发送给订阅者
[订阅者:1 和 2][要发出的项目:1,2,3]
将项目“1”发送给订阅者
[订阅者:1 和 2][要发出的项目:2,3]
...
但这不是它的工作原理。就好像它们是两个独立的可观察量合而为一。这让我很困惑,为什么他们不把这些元素提供给所有订阅者?
奖金:
publish().autoConnect()
如何解决该问题?让我们来分解一下。 publish()
给了我一个可连接的可观察对象。可连接的可观察量就像常规的可观察量一样,但您可以告诉它何时连接。然后我继续通过调用 autoConnect()
通过这样做...我不会得到与开始时相同的结果吗?一个普通的正则可观察量。 运营商似乎相互取消。
我可以闭嘴并使用publish().autoconnect()
。但我想更多地了解可观察量的工作原理。
谢谢!
最佳答案
这是因为实际上这是两个独立的可观察量。当您调用 subscribe()
时,它们就会“生成”。因此,您提供的步骤是不正确的,因为步骤 3 和 4 只是 1 和 2,但基于不同的可观察值。
但是由于日志记录发生的线程,您将它们视为 1 1 1 2 2 2。如果您要删除 observeOn()
部分,那么您会看到交织方式的排放。要查看下面的运行代码:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
Observable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation());
//.observeOn(single);
dataStream.subscribe(i -> System.out.println("1 " + Thread.currentThread().getName() + " " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + Thread.currentThread().getName() + " " + (i - l)));
Thread.sleep(1000);
}
输出,至少在我的运行中是(注意线程名称):
1 RxComputationThreadPool-1 135376988
2 RxComputationThreadPool-2 135376988
1 RxComputationThreadPool-1 135486815
2 RxComputationThreadPool-2 135537383
1 RxComputationThreadPool-1 135560691
2 RxComputationThreadPool-2 135617580
如果你应用observeOn()
,它就会变成:
1 RxSingleScheduler-1 186656395
1 RxSingleScheduler-1 187919407
1 RxSingleScheduler-1 187923753
2 RxSingleScheduler-1 186656790
2 RxSingleScheduler-1 187860148
2 RxSingleScheduler-1 187864889
正如您所正确指出的,要获得您想要的内容,您需要 publish().refcount()
或简单地 share()
(它是一个别名)运算符。
这是因为 publish()
创建了一个 ConnectableObservable
,它不会开始发射项目,直到通过 connect()
被告知这样做方法。在这种情况下,如果您这样做:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
ConnectableObservable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation())
.observeOn(single)
.publish();
dataStream.subscribe(i -> System.out.println("1 " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + (i - l)));
Thread.sleep(1000);
dataStream.connect();
Thread.sleep(1000);
}
您会注意到,在第一秒(第一个 Thread.sleep()
调用)没有任何反应,并且在调用 dataStream.connect()
后排放发生。
refCount()
接受 ConnectableObservable,并通过计算当前订阅的订阅者数量来向订阅者隐藏调用 connect()
的需要。它的作用是在第一次订阅时调用 connect()
,并在最后一次取消订阅后取消原始可观察值的订阅。
至于publish().autoConnect()
的相互取消,之后你确实得到了一个observable,但它有一个特殊的属性,假设原始的observable通过互联网进行API调用(持续 10 秒),当您在不使用 share()
的情况下使用它时,您最终会对服务器进行与这 10 秒内的订阅数量一样多的并行查询。另一方面,使用 share()
您将只有一次调用。
如果共享的可观察量非常快地完成其工作(例如 just(1,2,3)
),您将看不到它的任何好处。
autoConnect()
/refCount()
为您提供一个您订阅的中间可观察值,而不是原始可观察值。
如果您有兴趣深入了解这本书:Reactive Programming with RxJava
关于rx-java - RxJava,一个可观察多个订阅者 : publish(). autoConnect(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41915738/
参见 BluetoothDevice.connectGatt() . autoConnect的描述是 boolean: Whether to directly connect to the remot
我正在使用 rxJava/rxAndroid,但有一些非常基本的东西没有按照我的预期运行。我有一个可观察对象和两个订阅者: Observable dataStream = Observable.jus
我有一个 fragment ,我在其中执行 API 请求。该 fragment 可以作为 fragment 替换的结果进入堆栈,然后再返回。那一次因为只重新创建了 View ,所以我不想重新执行 Ap
根据我的观点,我认为 share() 和 publish().autoConnect() 是一样的。但是在这段代码中,结果是不一样的 Observable cold = Observable.crea
我的目标是在低功耗蓝牙设备和手机之间建立自动连接。我按照示例代码进行操作,找到了这一行 // We want to directly connect to the device, so we are
我想创建一个可观察量,仅当新值与前一个值不同时,它才会从底层热可观察量(之前以 -1 开始)发出值。此外,我希望最新的值(value)能够立即发送给新订阅者。我想出了以下代码: PublishSubj
我必须为可观察结果提供一个短期缓存。 查看选项,我看到以下内容: 缓存 replay(1).refCount()当数据准备好时,缓存实际值。缓存检索将检查实际数据并执行 Observable.just
我正在开发 Android 和 BLE。我希望应用在 BLE 设备 断开连接但返回范围内并播放广告后自动重新连接 到BLE 设备。 我使用以下代码连接到 BLE 设备: public void con
我是一名优秀的程序员,十分优秀!