gpt4 book ai didi

rx-java - RxJava,一个可观察多个订阅者 : publish(). autoConnect()

转载 作者:行者123 更新时间:2023-12-02 06:58:21 29 4
gpt4 key购买 nike

我正在使用 rxJava/rxAndroid,但有一些非常基本的东西没有按照我的预期运行。我有一个可观察对象和两个订阅者:

Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());

Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));

Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));

这是输出:

D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3

现在,我知道我可以通过使用 publish().autoConnect() 来避免重复计数,但我首先尝试了解这种默认行为。每次有人订阅可观察对象时,它就会开始发出数字序列。我明白了。因此,当订阅者 1 连接时,它开始发出项目。 订阅者 2 立即连接,为什么它没有也获取值?

我是这样理解的,从可观察的角度来看:

  1. 有人订阅了我,我应该开始发布项目
    [订阅者:1][要发出的项目:1,2,3]

  2. 将项目“1”发送给订阅者
    [订阅者:1][要发出的项目:2,3]

  3. 有人订阅了我,完成后我会再次发出 1,2,3
    [订阅者:1 和 2][要发出的项目:2,3,1,2,3]

  4. 将项目“2”发送给订阅者
    [订阅者:1 和 2][要发出的项目:3,1,2,3]

  5. 将项目“3”发送给订阅者
    [订阅者:1 和 2][要发出的项目:1,2,3]

  6. 将项目“1”发送给订阅者
    [订阅者:1 和 2][要发出的项目:2,3]

  7. ...

但这不是它的工作原理。就好像它们是两个独立的可观察量合而为一。这让我很困惑,为什么他们不把这些元素提供给所有订阅者?

奖金:

publish().autoConnect() 如何解决该问题?让我们来分解一下。 publish() 给了我一个可连接的可观察对象。可连接的可观察量就像常规的可观察量一样,但您可以告诉它何时连接。然后我继续通过调用 autoConnect()

告诉它立即连接

通过这样做...我不会得到与开始时相同的结果吗?一个普通的正则可观察量。 运营商似乎相互取消。

我可以闭嘴并使用publish().autoconnect()。但我想更多地了解可观察量的工作原理。

谢谢!

最佳答案

这是因为实际上这是两个独立的可观察量。当您调用 subscribe() 时,它们就会“生成”。因此,您提供的步骤是不正确的,因为步骤 3 和 4 只是 1 和 2,但基于不同的可观察值。

但是由于日志记录发生的线程,您将它们视为 1 1 1 2 2 2。如果您要删除 observeOn() 部分,那么您会看到交织方式的排放。要查看下面的运行代码:

@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
Observable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation());
//.observeOn(single);

dataStream.subscribe(i -> System.out.println("1 " + Thread.currentThread().getName() + " " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + Thread.currentThread().getName() + " " + (i - l)));

Thread.sleep(1000);
}

输出,至少在我的运行中是(注意线程名称):

1  RxComputationThreadPool-1 135376988
2 RxComputationThreadPool-2 135376988
1 RxComputationThreadPool-1 135486815
2 RxComputationThreadPool-2 135537383
1 RxComputationThreadPool-1 135560691
2 RxComputationThreadPool-2 135617580

如果你应用observeOn(),它就会变成:

1  RxSingleScheduler-1 186656395
1 RxSingleScheduler-1 187919407
1 RxSingleScheduler-1 187923753
2 RxSingleScheduler-1 186656790
2 RxSingleScheduler-1 187860148
2 RxSingleScheduler-1 187864889

正如您所正确指出的,要获得您想要的内容,您需要 publish().refcount() 或简单地 share() (它是一个别名)运算符。

这是因为 publish() 创建了一个 ConnectableObservable,它不会开始发射项目,直到通过 connect() 被告知这样做方法。在这种情况下,如果您这样做:

@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
ConnectableObservable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation())
.observeOn(single)
.publish();

dataStream.subscribe(i -> System.out.println("1 " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + (i - l)));

Thread.sleep(1000);
dataStream.connect();
Thread.sleep(1000);

}

您会注意到,在第一秒(第一个 Thread.sleep() 调用)没有任何反应,并且在调用 dataStream.connect() 后排放发生。

refCount() 接受 ConnectableObservable,并通过计算当前订阅的订阅者数量来向订阅者隐藏调用 connect() 的需要。它的作用是在第一次订阅时调用 connect() ,并在最后一次取消订阅后取消原始可观察值的订阅。

至于publish().autoConnect()的相互取消,之后你确实得到了一个observable,但它有一个特殊的属性,假设原始的observable通过互联网进行API调用(持续 10 秒),当您在不使用 share() 的情况下使用它时,您最终会对服务器进行与这 10 秒内的订阅数量一样多的并行查询。另一方面,使用 share() 您将只有一次调用。

如果共享的可观察量非常快地完成其工作(例如 just(1,2,3)),您将看不到它的任何好处。

autoConnect()/refCount() 为您提供一个您订阅的中间可观察值,而不是原始可观察值。

如果您有兴趣深入了解这本书:Reactive Programming with RxJava

关于rx-java - RxJava,一个可观察多个订阅者 : publish(). autoConnect(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41915738/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com