gpt4 book ai didi

java - Apache Camel : async operation and backpressure

转载 作者:太空宇宙 更新时间:2023-11-04 11:26:42 33 4
gpt4 key购买 nike

在 Apache Camel 2.19.0 中,我想在并发 seda 队列上异步生成消息并使用结果,同时如果 seda 队列上的执行程序已满,则阻塞。它背后的用例:我需要处理包含多行的大文件,并且需要为其创建批处理,因为每行的一条消息开销太大,而我无法将整个文件放入堆中。但最后,我需要知道我触发的所有批处理是否都已成功完成。因此,我需要一个反压机制来向队列发送垃圾邮件,同时希望利用多线程处理。

这是 Camel 和 Spring 中的一个简单示例。我配置的路线:

package com.test;

import org.apache.camel.builder.RouteBuilder;
import org.springframework.stereotype.Component;

@Component
public class AsyncCamelRoute extends RouteBuilder {

public static final String ENDPOINT = "seda:async-queue?concurrentConsumers=2&size=2&blockWhenFull=true";

@Override
public void configure() throws Exception {
from(ENDPOINT)
.process(exchange -> {
System.out.println("Processing message " + (String)exchange.getIn().getBody());
Thread.sleep(10_000);
});
}
}

生产者看起来像这样:

package com.test;

import org.apache.camel.ProducerTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;

@Component
public class AsyncProducer {

public static final int MAX_MESSAGES = 100;

@Autowired
private ProducerTemplate producerTemplate;

@EventListener
public void handleContextRefresh(ContextRefreshedEvent event) throws Exception {
new Thread(() -> {
// Just wait a bit so everything is initialized
try {
Thread.sleep(5_000);
} catch (InterruptedException e) {
e.printStackTrace();
}
List<CompletableFuture> futures = new ArrayList<>();

System.out.println("Producing messages");
for (int i = 0; i < MAX_MESSAGES; i++) {
CompletableFuture future = producerTemplate.asyncRequestBody(AsyncCamelRoute.ENDPOINT, String.valueOf(i));
futures.add(future);
}
System.out.println("All messages produced");

System.out.println("Waiting for subtasks to finish");
futures.forEach(CompletableFuture::join);
System.out.println("Subtasks finished");
}).start();

}
}

此代码的输出如下所示:

Producing messages
All messages produced
Waiting for subtasks to finish
Processing message 6
Processing message 1
Processing message 2
Processing message 5
Processing message 8
Processing message 7
Processing message 9
...
Subtasks finished

因此,似乎 blockIfFull 被忽略,所有消息都在处理之前创建并放入队列中。

有什么方法可以创建消息,以便我可以在 Camel 中使用异步处理,同时确保如果有太多未处理的元素,将元素放入队列会阻塞?

最佳答案

我通过使用流和自定义拆分器解决了这个问题。通过这样做,我可以使用迭代器将源行拆分为 block ,该迭代器返回行列表而不是仅返回单行。有了这个,我觉得我可以根据需要使用Camel了。

因此该路线包含以下部分:

.split().method(new SplitterBean(), "splitBody").streaming().parallelProcessing().executorService(customExecutorService)

使用具有上述行为的定制分离器。

关于java - Apache Camel : async operation and backpressure,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44279526/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com