- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在使用 rxJava/rxAndroid,但有一些非常基本的东西没有按照我的预期运行。我有一个可观察对象和两个订阅者:
Observable<Integer> dataStream = Observable.just(1, 2, 3).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread());
Log.d(TAG, "subscribing sub1...");
dataStream.subscribe(v -> System.out.println("Subscriber #1: "+ integer));
Log.d(TAG, "subscribing sub2...");
dataStream.subscribe(v -> System.out.println("Subscriber #2: "+ integer));
这是输出:
D/RxJava: subscribing sub1...
D/RxJava: subscribing sub2...
D/RxJava: Subscriber #1: 1
D/RxJava: Subscriber #1: 2
D/RxJava: Subscriber #1: 3
D/RxJava: Subscriber #2: 1
D/RxJava: Subscriber #2: 2
D/RxJava: Subscriber #2: 3
现在,我知道我可以通过使用 publish().autoConnect()
来避免重复计数,但我首先尝试了解这种默认行为。每次有人订阅可观察对象时,它就会开始发出数字序列。我明白了。因此,当订阅者 1
连接时,它开始发出项目。 订阅者 2
立即连接,为什么它没有也获取值?
我是这样理解的,从可观察的角度来看:
有人订阅了我,我应该开始发布项目
[订阅者:1][要发出的项目:1,2,3]
将项目“1”发送给订阅者
[订阅者:1][要发出的项目:2,3]
有人订阅了我,完成后我会再次发出 1,2,3
[订阅者:1 和 2][要发出的项目:2,3,1,2,3]
将项目“2”发送给订阅者
[订阅者:1 和 2][要发出的项目:3,1,2,3]
将项目“3”发送给订阅者
[订阅者:1 和 2][要发出的项目:1,2,3]
将项目“1”发送给订阅者
[订阅者:1 和 2][要发出的项目:2,3]
...
但这不是它的工作原理。就好像它们是两个独立的可观察量合而为一。这让我很困惑,为什么他们不把这些元素提供给所有订阅者?
奖金:
publish().autoConnect()
如何解决该问题?让我们来分解一下。 publish()
给了我一个可连接的可观察对象。可连接的可观察量就像常规的可观察量一样,但您可以告诉它何时连接。然后我继续通过调用 autoConnect()
通过这样做...我不会得到与开始时相同的结果吗?一个普通的正则可观察量。 运营商似乎相互取消。
我可以闭嘴并使用publish().autoconnect()
。但我想更多地了解可观察量的工作原理。
谢谢!
最佳答案
这是因为实际上这是两个独立的可观察量。当您调用 subscribe()
时,它们就会“生成”。因此,您提供的步骤是不正确的,因为步骤 3 和 4 只是 1 和 2,但基于不同的可观察值。
但是由于日志记录发生的线程,您将它们视为 1 1 1 2 2 2。如果您要删除 observeOn()
部分,那么您会看到交织方式的排放。要查看下面的运行代码:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
Observable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation());
//.observeOn(single);
dataStream.subscribe(i -> System.out.println("1 " + Thread.currentThread().getName() + " " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + Thread.currentThread().getName() + " " + (i - l)));
Thread.sleep(1000);
}
输出,至少在我的运行中是(注意线程名称):
1 RxComputationThreadPool-1 135376988
2 RxComputationThreadPool-2 135376988
1 RxComputationThreadPool-1 135486815
2 RxComputationThreadPool-2 135537383
1 RxComputationThreadPool-1 135560691
2 RxComputationThreadPool-2 135617580
如果你应用observeOn()
,它就会变成:
1 RxSingleScheduler-1 186656395
1 RxSingleScheduler-1 187919407
1 RxSingleScheduler-1 187923753
2 RxSingleScheduler-1 186656790
2 RxSingleScheduler-1 187860148
2 RxSingleScheduler-1 187864889
正如您所正确指出的,要获得您想要的内容,您需要 publish().refcount()
或简单地 share()
(它是一个别名)运算符。
这是因为 publish()
创建了一个 ConnectableObservable
,它不会开始发射项目,直到通过 connect()
被告知这样做方法。在这种情况下,如果您这样做:
@Test
public void test() throws InterruptedException {
final Scheduler single = Schedulers.single();
final long l = System.nanoTime();
ConnectableObservable<Long> dataStream =
Observable.just(1, 2, 3)
.map(i -> System.nanoTime())
.subscribeOn(Schedulers.computation())
.observeOn(single)
.publish();
dataStream.subscribe(i -> System.out.println("1 " + (i - l)));
dataStream.subscribe(i -> System.out.println("2 " + (i - l)));
Thread.sleep(1000);
dataStream.connect();
Thread.sleep(1000);
}
您会注意到,在第一秒(第一个 Thread.sleep()
调用)没有任何反应,并且在调用 dataStream.connect()
后排放发生。
refCount()
接受 ConnectableObservable,并通过计算当前订阅的订阅者数量来向订阅者隐藏调用 connect()
的需要。它的作用是在第一次订阅时调用 connect()
,并在最后一次取消订阅后取消原始可观察值的订阅。
至于publish().autoConnect()
的相互取消,之后你确实得到了一个observable,但它有一个特殊的属性,假设原始的observable通过互联网进行API调用(持续 10 秒),当您在不使用 share()
的情况下使用它时,您最终会对服务器进行与这 10 秒内的订阅数量一样多的并行查询。另一方面,使用 share()
您将只有一次调用。
如果共享的可观察量非常快地完成其工作(例如 just(1,2,3)
),您将看不到它的任何好处。
autoConnect()
/refCount()
为您提供一个您订阅的中间可观察值,而不是原始可观察值。
如果您有兴趣深入了解这本书:Reactive Programming with RxJava
关于rx-java - RxJava,一个可观察多个订阅者 : publish(). autoConnect(),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41915738/
根据下面的链接,我应该能够配置Web一键发布。甚至还有屏幕截图显示如何实现这一目标。但是,我在解决方案资源管理器中找不到该选项。我是弱智还是瞎子?! 如果有人质疑我的理智和在菜单上查找项目的能力,我很
我使用 OAuth 框架,它像这样异步创建经过身份验证的请求: OAuthSession.current.makeAuthenticatedRequest(request: myURLRequest)
我如何跨此 我为一个简单的秒表编写了代码,它也可以兼用作Rubik的立方计时器。源代码和可执行文件在这里: Cube timer 无论如何,我的疑问不是关于此代码的(它工作正常)。 我下载了我上传的可
我想使用 Apple 的新 Combine 框架从列表中的每个元素发出多个请求。然后我想要一个减少所有响应的单一结果。基本上,我想从发布者列表转到拥有响应列表的单个发布者。 我尝试制作一个发布商列表,
我在 EnvironmentObject 中为我的应用创建了一个“状态”对象像这样: class AppState: ObservableObject { @Published var cou
将企业应用程序部署到服务器(例如 Glassfish 或 JBoss)时,完全发布和增量发布有什么区别? 我看到部署的工件树中列出了几个模块,但是当我在 Web 存档上使用增量发布时,会发生一些事情,
我找不到这个记录。假设我想将一个端口发布到一个已知的地方,但有时会发布所有其他“暴露”的端口以进行调试或测试。 一个简单的 Dockerfile FROM alpine CMD /bin/sleep
在使用 ivy:publish ant 任务发布工件时,工件名称会附加我们为 ivy:publishrevision/pubrevision 属性指定的任何内容> 任务。 有没有办法将时间戳附加到这个
来自数据库系统概念,用于对象关系数据库的 SQL 命令: create type Publisher as (name varchar(20), branch varchar(20)); create
我有一个发布功能如下: Meteor.publish('tasks', function (name) { var project = Projects.findOne({name: name
我目前正在尝试实现两个出版商的合并。但是我找不到适合我的用例的解决方案。 我想合并 2 个发布者,它们都发出相同类型的结构数组。我希望合并的发布者在任一合并的发布者发出新值时发出值。 基本上这将是 P
我正在尝试复制 WWDC 2019 session “结合实践”中给出的“Wizard School Signup”示例 https://developer.apple.com/videos/play
我遇到 TweetInvi 0.9.9.7 无法上传视频的问题。该视频是一个 9MB 的 MP4 视频,我可以使用网络界面将它上传到 Twitter。我收到的错误消息是: The tweet cann
我在本地使用第三方库,我使用他们提供的步骤安装了所有内容。 我对包运行了 composer require 并运行了更新。这安装到 vendor 文件夹中。 然后我将路径添加到 config/app
尝试编译以下代码时: class LoginViewModel: ObservableObject, Identifiable { @Published var mailAdress: Str
我使用 .NET Core Framework 在 Visual Studio 2015 中创建了一个简单的 Web API 项目。当我使用默认设置发布此项目时,它会创建以下内容: 总共有 155 个
我正在 Laravel 7 中实现一个包并使用 https://github.com/jeroennoten/Laravel-AdminLTE作为引用。 在我的包内,我有以下结构 packages/m
当我尝试使用 Google 的结构化数据测试工具验证我的结构化数据时,出现错误: The attribute publisher.itemtype has an invalid value. 我在这条
刚从使用 Books 应用程序示例的 Djangobook 教程中学习时,您通过多对多关系将 Book 与 Author 相关,并将 Book 与 Publisher 相关。您可以使用 p.book_
我只是不得不这样做。绝对每个问题我都查找了有关此问题的问题,但他们的答案都没有帮助我解决问题。 我正在尝试在我的 Facebook 页面上发帖。 问题是: 错误:“(#100)您不能在已发布的帖子上指
我是一名优秀的程序员,十分优秀!