gpt4 book ai didi

java - RxJava 在可连接的 observable 上重新连接不起作用

转载 作者:行者123 更新时间:2023-12-02 11:55:42 24 4
gpt4 key购买 nike

嗨,大家好。我正在尝试使用 RxJava 制作某种 MVC。所以想法是进行一些将始终被订阅的持续订阅一些可观察到的。此外,这个可观察对象可以随时重新启动以重新运行,例如网络调用。我尝试使用以下代码来测试此功能:

class Main {
companion object {
@JvmStatic
fun main(args: Array<String>) {
val obs = Observable.interval(1, TimeUnit.SECONDS, Schedulers.io())
.publish()

val s1 = obs
.doOnUnsubscribe { System.out.println("s1 unsubscribed") }
.subscribe { System.out.println("first: $it") }

val s = obs.connect()

Thread.sleep(4000)

System.out.println("unsubscribe")
s.unsubscribe()

Thread.sleep(1000)

System.out.println("connect")
val obsS2 = obs.connect()

System.out.println("isUnsubscribed: ${s1.isUnsubscribed}")

Thread.sleep(10000)
}
}
}

这就是我所期望的:

first: 0
first: 1
first: 2
unsubscribe
connect
isUnsubscribed: false
first: 0
first: 1
...
Process finished with exit code 0

这是实际结果:

first: 0
first: 1
first: 2
unsubscribe
connect
isUnsubscribed: false

Process finished with exit code 0

我发现了一些帖子( RxJava - ConnectableObservable, disconnecting and reconnectinghttps://github.com/Froussios/Intro-To-RxJava/issues/18 ),人们说这是一个错误,但这个错误已经存在了很长一段时间。

那么问题来了:这真的是一个错误吗?如果没有,我怎样才能实现这样的行为?

编辑:在版本 1.3.4、1.2.10、1.1.10、1.0.10 上测试

最佳答案

这不是一个错误,而是 RxJava 的 ConnectableObservable 的一个属性:如果您取消订阅连接,之前订阅的消费者将被弹出,并且不会收到任何进一步的事件,即使重新连接时也是如此。

您可以通过使用 PublishSubject 实现与 Rx.NET 行为类似的效果,然后订阅和取消订阅实际源:

PublishSubject subject = PublishSubject.create();

subject.subscribe(System.out::println);

Subscription s = source.subscribe(subject::onNext);

s.unsubscribe();

Subscription s2 = source.subscribe(subject::onNext);

关于java - RxJava 在可连接的 observable 上重新连接不起作用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47618893/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com