- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我有一个发布者,其发布速度可能比订阅者处理数据的速度快。为了解决这个问题,我开始使用背压。因为我不想丢弃任何数据,所以我使用 react 拉背压。我将其理解为订阅者能够告诉发布者何时发布更多数据,如 this and the follwing paragraphs 中所述。 。
发布者是一个 Flowable,它异步并行地工作,然后合并到顺序 Flowable 中。数据应缓冲最多 10 个元素,当该缓冲区已满时,Flowable 不应发布更多数据并等待下一个请求。
订阅者是一个 DisposableSubscriber,在开始时请求 10 个项目。每个消耗的项目都需要一些计算,之后将请求新的项目。
我的 MWE 看起来像这样:
List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
src.add(i);
}
Flowable.fromIterable(src)
.parallel(10, 1)
.runOn(Schedulers.from(Executors.newFixedThreadPool(10)))
.flatMap(i -> Single.fromCallable(() -> {
System.out.println("publisher: " + i);
Thread.sleep(200);
return i;
}).toFlowable())
.sequential(1)
.onBackpressureBuffer(10)
.observeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.doOnError(Throwable::printStackTrace)
.subscribeWith(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(10);
}
@Override
public void onNext(Integer integer) {
System.out.println("subscriber: " + integer);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
我期望此代码执行以下操作:订阅者请求前 10 项。出版商出版前 10 项。然后,订阅者在 onNext
中进行计算并请求更多项目,发布者将发布这些项目。
实际发生的情况:起初,发布者似乎无限制地发布项目。在某个时刻,例如发布 14 个项目后,订阅者处理其第一个项目。在此期间,发布者会继续发布项目。发布大约 30 个项目后,会抛出 io.reactivex.exceptions.MissingBackPressureException: Buffer is full
并且流结束。
我的问题:我做错了什么? 如何让订阅者控制发布者是否以及何时发布数据?显然,我正在做一些非常错误的事情。否则,期望与现实不会有如此大的差异。
上述 MWE 的输出示例:
publisher: 5
publisher: 7
publisher: 8
publisher: 0
publisher: 2
publisher: 6
publisher: 9
publisher: 3
publisher: 4
publisher: 1
publisher: 18
publisher: 17
publisher: 15
subscriber: 0
publisher: 11
publisher: 10
publisher: 19
publisher: 13
publisher: 14
publisher: 12
publisher: 16
publisher: 27
publisher: 28
publisher: 23
publisher: 21
publisher: 29
publisher: 20
publisher: 25
publisher: 22
publisher: 26
io.reactivex.exceptions.MissingBackpressureException: Buffer is full
最佳答案
不是 Rx 方面的专家,但让我尝试一下。.observeOn(...)
有自己的默认缓冲区大小 128。所以,从一开始它就会从上游请求的内容超出了您的缓冲区所能容纳的内容。
observeOn(...)
接受可选的缓冲区大小覆盖,但即使您提供它,ParallelFlowable 也会调用您的 flatMap(...)
方法的频率比您想要的要高。我不是 100% 确定为什么,也许它有自己的内部缓冲,在将轨道合并回顺序时执行。
我认为您可以通过使用 flatMap(...)
而不是 parralel(...)
并提供 maxConcurrency 参数来更接近您想要的行为。
要记住的另一件事是,您不想调用 subscribeOn(...)
- 它会影响整个上游 Flowable。因此,如果您已经在调用 parallel(...).runOn(...)
,则它不会产生任何效果,或者效果将出乎意料。
有了上述内容,我认为这会让您更接近您正在寻找的东西:
List<Integer> src = new ArrayList<>();
for (int i = 0; i < 200; i++) {
src.add(i);
}
Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10));
Flowable.fromIterable(src)
.flatMap(
i -> Flowable.just( i )
.subscribeOn(scheduler) // here subscribeOn(...) affects just this nested Flowable
.map( __ -> {
System.out.println("publisher: " + i);
Thread.sleep(200);
return i;
} ),
10) // max concurrency
.observeOn(Schedulers.newThread(), false, 10) // override buffer size
.doOnError(Throwable::printStackTrace)
.subscribeWith(new DisposableSubscriber<Integer>() {
@Override
protected void onStart() {
request(10);
}
@Override
public void onNext(Integer integer) {
System.out.println("subscriber: " + integer);
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
request(1);
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
关于java - 订阅者如何通过 react 性拉背压来控制发布者?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60706697/
我想使用 Java 实现各种发布者/订阅者模式,但目前没有想法。 有 1 个发布者和 N 个订阅者,发布者发布对象,然后每个订阅者需要按照正确的顺序处理每个对象一次且仅一次。发布者和每个订阅者在自己的
我正在学习发布者/订阅者模式,并做了一个简单的例子来更好地理解它。 var Publisher = { subscribers: {}, Subscribe: functi
这是我目前挂断的一般设计问题。它导致了一些心理代码块......如果它只是一个常见的陷阱,我不想继续这样,但如果它被接受使用,我想继续,因为代码非常干净并且解耦(除了横切消息)。 我在代码中有一个基于
当我打开安装程序(使用NSIS创建)时,会出现UAC对话框,其中包含有关安装程序的信息。字段发布者为“未知”。我听说过对应用程序进行数字签名,您知道如何在NSIS中执行此操作吗? 如何将字段/属性发布
我正在使用具有 DefaultMessageListenerContainer 的 pub/sub 模型。我已将并发消费者配置为 5 个。如何唯一标识每个消费者? 我试图将相应监听器处理的每个事件存储
我想创建一个可以从不同线程调用的 RabbitMQ 发布者。 根据 RabbitMQ 最佳实践,我不应允许在不同线程中使用同一 channel ,因此在发布者的多个实例中共享此 channel 将导致
是否可以仅通过使用事件(即不是列表或字典)在 WCF 中实现发布者/订阅者模型? 如果是,请向我提供示例应用程序的网络链接,或任何讨论此问题的文章。 最佳答案 是的。 Here是一篇关于它的 MSDN
我使用 rosjava 作为订阅者和 rospy 作为发布者实现了发布/订阅。但是,我没有收到 rospy 的任何消息。我在这里错过了什么吗? JAVA 订阅者(假设 IP 为:1.1.1.1) pu
关闭。这个问题不符合Stack Overflow guidelines .它目前不接受答案。 这个问题似乎不是关于 a specific programming problem, a softwar
我正在尝试签署一个小程序,这样发布者就不会显示为“未知”: 我为一个组织工作,我们有我们自己的证书颁发机构,证书链如下:ORG Root CA > ORG Trusted Certification
这是对我尝试使用 combine 实现的目标的过度简化。 如果发生某些事情,我需要向共享发布者注入(inject)一个值。 在这里您可以看到,如果 map 接收到数字 2,我使用原始发布者上的发送命令
我们有以下使用 .NET RabbitMQ 库的场景: 工作线程从队列中获取“请求”消息,并将它们分派(dispatch)到多个工作线程上进行处理。完成后,每个工作线程都会发送另一条消息。 我的问题是
当我将一些 Objective-C 代码移植到 Swift 时,我试图更好地理解新的 Combine 框架以及我如何使用它来重新创建一个通用的设计模式。 在这种情况下,设计模式是单个对象(管理器、服务
我有一个基本的 ZeroMQ 场景,由两个发布者和一个订阅者组成。在我决定将局域网内不同计算机上的所有进程分开之前,这在本地计算机上一直运行良好。这就是我创建 ZeroMQ 套接字的方式(简化的 Py
我目前正在尝试使用 0MQ 创建代理。我想订阅一个不在 0MQ 下运行的发布者。我的意思是我有发送 XML 文件的远程发布者的地址和端口,我想获取该文件。不幸的是,我没有收到发布者的任何消息,但它发送
我尝试使用 C++ 在 ZeroMQ 中开发发布者订阅者模型,我从 JSON 文件中提取对象值并将其发送到另一端。 我的订户部分运行良好,但出现任何错误。但是我在发布者部分面临以下错误:(在 if 语
我问过 Google 并搜索过 NServiceBus 网站和论坛,但我似乎找不到任何关于如何编写 Java 应用程序来订阅发布者的说明性指导。有没有人有任何这样的联系或经验? 最佳答案 这种情况并没
有谁知道如何使用 CruiseControl.Net 发布到 FTP 服务器? 最佳答案 我认为没有特定的方法可以做到这一点。您当然可以始终在 cruise control.net 的窗口中运行命令行
我正在尝试使用Ocelot在ASP.Net API网关中使用JWT承载身份验证,以与多个授权机构/发行者一起工作。一个颁发者是Auth0,另一个是基于IdentityServer4的内部身份验证服务器
如何删除 Visual Studio Marketplace 发布者? 在 https://marketplace.visualstudio.com/manage/publishers/ 上查看我的帐
我是一名优秀的程序员,十分优秀!