- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
在 Apache Camel 2.19.0 中,我想在并发 seda 队列上异步生成消息并使用结果,同时如果 seda 队列上的执行程序已满,则阻塞。它背后的用例:我需要处理包含多行的大文件,并且需要为其创建批处理,因为每行的一条消息开销太大,而我无法将整个文件放入堆中。但最后,我需要知道我触发的所有批处理是否都已成功完成。因此,我需要一个反压机制来向队列发送垃圾邮件,同时希望利用多线程处理。
这是 Camel 和 Spring 中的一个简单示例。我配置的路线:
package com.test;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;
@Component
public class AsyncCamelRoute extends RouteBuilder {
public static final String ENDPOINT = "seda:async-queue?concurrentConsumers=2&size=2&blockWhenFull=true";
@Override
public void configure() throws Exception {
from(ENDPOINT)
.process(exchange -> {
System.out.println("Processing message " + (String)exchange.getIn().getBody());
Thread.sleep(10_000);
});
}
}
生产者看起来像这样:
package com.test;
import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
@Component
public class AsyncProducer {
public static final int MAX_MESSAGES = 100;
@Autowired
private ProducerTemplate producerTemplate;
@EventListener
public void handleContextRefresh(ContextRefreshedEvent event) throws Exception {
new Thread(() -> {
// Just wait a bit so everything is initialized
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<CompletableFuture> futures = new ArrayList<>();
System.out.println("Producing messages");
for (int i = 0; i < MAX_MESSAGES; i++) {
CompletableFuture future = producerTemplate.asyncRequestBody(AsyncCamelRoute.ENDPOINT, String.valueOf(i));
futures.add(future);
}
System.out.println("All messages produced");
System.out.println("Waiting for subtasks to finish");
futures.forEach(CompletableFuture::join);
System.out.println("Subtasks finished");
}).start();
}
}
此代码的输出如下所示:
Producing messages
All messages produced
Waiting for subtasks to finish
Processing message 6
Processing message 1
Processing message 2
Processing message 5
Processing message 8
Processing message 7
Processing message 9
...
Subtasks finished
因此,似乎 blockIfFull 被忽略,所有消息都在处理之前创建并放入队列中。
有什么方法可以创建消息,以便我可以在 Camel 中使用异步处理,同时确保如果有太多未处理的元素,将元素放入队列会阻塞?
最佳答案
我通过使用流和自定义拆分器解决了这个问题。通过这样做,我可以使用迭代器将源行拆分为 block ,该迭代器返回行列表而不是仅返回单行。有了这个,我觉得我可以根据需要使用Camel了。
因此该路线包含以下部分:
.split().method(new SplitterBean(), "splitBody").streaming().parallelProcessing().executorService(customExecutorService)
使用具有上述行为的定制分离器。
关于java - Apache Camel : async operation and backpressure,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44279526/
我正在尝试制作带有背压的流动性。我的想法是,在当前项目之一完成处理之前,不会发出新的可流动项目。我正在使用 ResourceSubscriber 和 subscribeWith() 方法来实现这一点。
我一直在阅读一些关于 RxJava 背压的文档,但是我找不到详细的解释,比如它是如何在库内部发生的,每个人都只是总结说“生产者”太快而“消费者”太慢。 例如像下面的代码: Observable.int
我有一个 PublishSubject,它在某些 UI 事件上调用 onNext()。订阅者通常需要 2 秒来完成其工作。我需要在订阅者忙碌时忽略对 onNext() 的所有调用,但最后一个除外。我尝
有这样的情况,上游传递到下游的数据需要进行处理,然而上游推送的速度又很快,下游由于资源等原因来不及处理;如果这时还是通过不限制上游速度的方式推送数据,就会出问题,因此Reactive Streams有
对于一个运算符,输入流比其输出流快,因此其输入缓冲区将阻塞前一个运算符的输出线程,该输出线程将数据传输到该运算符。对吗? Flink和Spark都是通过阻塞线程来处理背压的吗?那么它们有什么区别呢?
嗯,RxJava 中的背压并不是真正的背压,而只是忽略了一些元素集合。 但是如果我不能释放任何元素并且我需要以某种方式减慢发射速度怎么办? RxJava不能影响元素发射,所以开发者需要自己实现。但是如
在 Apache Camel 2.19.0 中,我想在并发 seda 队列上异步生成消息并使用结果,同时如果 seda 队列上的执行程序已满,则阻塞。它背后的用例:我需要处理包含多行的大文件,并且需要
我有一个关于 Combine 中的 zip 运算符与背压结合的问题。 采用以下代码片段: let sequencePublisher = Publishers.Sequence, Never>(seq
我有一个 flink 作业,它从 Kafka 读取数据,执行某些聚合并将结果写入 elasticsearch 索引。我在源上看到高背压。高背压导致数据从 Kafka 读取缓慢,我看到数据在网络堆栈中排
这是我对这个主题的理解。 有发布者和订阅者。 发布者和订阅者的伪代码是这样的 Publisher{ Subscriber s; subscribe(Subscriber s){
我有一个 Spark 流应用程序,它使用 Spark Direct Streaming(不是接收器)方法从 Kafka 读取消息并按分区处理消息。 在我的 Kafka 分区中,有时我们得到的消息需要
在阅读如下配置单元表后,我试图将数据写入 Kafka 主题。 write_kafka_data.py: read_df = spark.sql("select * from db.table wher
我是一名优秀的程序员,十分优秀!