gpt4 book ai didi

java - Rabbit Mq java客户端并行消费

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:18:02 28 4
gpt4 key购买 nike

我想并行处理来自 rabbitMq 队列的消息。队列配置为 autoAck =false。我正在使用 camel-rabbitMQ 支持 camel endpoints ,它支持 threadPoolSize 参数,但这没有达到预期的效果。即使在 threadpoolsize=20 时,消息仍会在队列外按顺序处理。

通过代码调试,我可以看到 threadpoolsize 参数用于创建一个 ExecutorService,该 ExecutorService 用于传递给 rabbit connectionfactory,如所述 here .在您进入兔子 ConsumerWorkService 之前,这一切看起来都不错。这里的消息在最大大小为 16 条消息的 block 中处理。 block 中的每条消息都按顺序处理,然后如果有更多工作要做,则执行程序服务将调用下一个 block 。下面是一个代码片段。从执行程序服务的这种使用中,我看不出如何并行处理消息。 executorservice 一次只能执行一项工作。

我错过了什么?

private final class WorkPoolRunnable implements Runnable {

public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}

最佳答案

RabbitMQ 的文档对此不是很清楚,但是,即使 ConsumerWorkService 使用线程池,这个池似乎并没有用于并行处理消息:

Each Channel has its own dispatch thread. For the most common use case of one Consumer per Channel, this means Consumers do not hold up other Consumers. If you have multiple Consumers per Channel be aware that a long-running Consumer may hold up dispatch of callbacks to other Consumers on that Channel.

( http://www.rabbitmq.com/api-guide.html )

本文档建议每个线程使用一个 Channel,事实上,如果您简单地创建与所需的并发级别一样多的 Channel,消息将在链接到这些 channel 的消费者。

我测试了 2 个 channel 和消费者:当队列中有 2 条消息时,每个消费者一次只能选择一条消息。您提到的 16 条消息的 block 似乎没有干扰,这是一件好事。

事实上,Spring AMQP 还创建了多个 channel 来并发处理消息。这是通过以下方式完成的:

我还测试了它是否按预期工作。

关于java - Rabbit Mq java客户端并行消费,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/19248930/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com