- 使用 Spring Initializr 创建 Spring Boot 应用程序
- 在Spring Boot中配置Cassandra
- 在 Spring Boot 上配置 Tomcat 连接池
- 将Camel消息路由到嵌入WildFly的Artemis上
有这样的情况,上游传递到下游的数据需要进行处理,然而上游推送的速度又很快,下游由于资源等原因来不及处理;如果这时还是通过不限制上游速度的方式推送数据,就会出问题,因此Reactive Streams有两一种处理方式,就是通过request的机制向上游传递信号,并指定接收数量;通过这种方法将push模型转化为push-pull hybrid,这就是backpressure的用法。
下面介绍backpressure比较原始的写法,通过构建Subscriber控制request的大小:
@Test
public void rawBackPressure () {
Flux<String> flux = Flux.range(1,10)
.map(i -> String.valueOf(i))
.log();
flux.subscribe(new Subscriber<String>() {
private int count = 0;
private Subscription subscription;
private int requestCount = 2;
@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
s.request(requestCount); // 启动
}
@SneakyThrows
@Override
public void onNext(String s) {
count++;
if (count == requestCount) { // 通过count控制每次request两个元素
Thread.sleep(1000);
subscription.request(requestCount);
count = 0;
}
}
@Override
public void onError(Throwable t) {
}
@Override
public void onComplete() {
}
});
}
@Test
public void baseBackPressure () {
Flux<Integer> flux = Flux.range(1,10).log();
flux.subscribe(new BaseSubscriber<Integer>() {
private int count = 0;
private final int requestCount = 2;
@Override
protected void hookOnSubscribe(Subscription subscription) {
request(requestCount);
}
@SneakyThrows
@Override
protected void hookOnNext(Integer value) {
count++;
if (count == requestCount) { // 通过count控制每次request两个元素
Thread.sleep(1000);
request(requestCount);
count = 0;
}
}
});
}
运行结果一样,request(2)之后执行两个next
@Test
public void backPressureLimitRate(){
Flux.range(1,10)
.log()
.limitRate(2)
.subscribe();
}
运行结果跟上面一样:
我正在尝试制作带有背压的流动性。我的想法是,在当前项目之一完成处理之前,不会发出新的可流动项目。我正在使用 ResourceSubscriber 和 subscribeWith() 方法来实现这一点。
我一直在阅读一些关于 RxJava 背压的文档,但是我找不到详细的解释,比如它是如何在库内部发生的,每个人都只是总结说“生产者”太快而“消费者”太慢。 例如像下面的代码: Observable.int
我有一个 PublishSubject,它在某些 UI 事件上调用 onNext()。订阅者通常需要 2 秒来完成其工作。我需要在订阅者忙碌时忽略对 onNext() 的所有调用,但最后一个除外。我尝
有这样的情况,上游传递到下游的数据需要进行处理,然而上游推送的速度又很快,下游由于资源等原因来不及处理;如果这时还是通过不限制上游速度的方式推送数据,就会出问题,因此Reactive Streams有
对于一个运算符,输入流比其输出流快,因此其输入缓冲区将阻塞前一个运算符的输出线程,该输出线程将数据传输到该运算符。对吗? Flink和Spark都是通过阻塞线程来处理背压的吗?那么它们有什么区别呢?
嗯,RxJava 中的背压并不是真正的背压,而只是忽略了一些元素集合。 但是如果我不能释放任何元素并且我需要以某种方式减慢发射速度怎么办? RxJava不能影响元素发射,所以开发者需要自己实现。但是如
在 Apache Camel 2.19.0 中,我想在并发 seda 队列上异步生成消息并使用结果,同时如果 seda 队列上的执行程序已满,则阻塞。它背后的用例:我需要处理包含多行的大文件,并且需要
我有一个关于 Combine 中的 zip 运算符与背压结合的问题。 采用以下代码片段: let sequencePublisher = Publishers.Sequence, Never>(seq
我有一个 flink 作业,它从 Kafka 读取数据,执行某些聚合并将结果写入 elasticsearch 索引。我在源上看到高背压。高背压导致数据从 Kafka 读取缓慢,我看到数据在网络堆栈中排
这是我对这个主题的理解。 有发布者和订阅者。 发布者和订阅者的伪代码是这样的 Publisher{ Subscriber s; subscribe(Subscriber s){
我有一个 Spark 流应用程序,它使用 Spark Direct Streaming(不是接收器)方法从 Kafka 读取消息并按分区处理消息。 在我的 Kafka 分区中,有时我们得到的消息需要
在阅读如下配置单元表后,我试图将数据写入 Kafka 主题。 write_kafka_data.py: read_df = spark.sql("select * from db.table wher
我是一名优秀的程序员,十分优秀!