gpt4 book ai didi

java - 执行器服务 RabbitMQ 中只有一个线程同时运行

转载 作者:太空宇宙 更新时间:2023-11-04 11:16:19 25 4
gpt4 key购买 nike

我已经创建了与具有 20 个核心的指定线程池的连接。

        ConnectionFactory factory = new ConnectionFactory();
....
//specified es
ExecutorService consumerExecutor = Executors.newFixedThreadPool(threadNum, threadFactory);
con = factory.newConnection(consumerExecutor, addresses);

然后从此连接创建一个 channel :

        final Channel channel = connection.createChannel();

并使用它创建一个 DefaultConsumer。

虽然我发现虽然可以使用线程来消费消息,但始终只有一个线程在消费消息,即使消息在服务器中大量积累。

我查看源代码并发现:

private final class WorkPoolRunnable implements Runnable {

@Override
public void run() {
int size = MAX_RUNNABLE_BLOCK_SIZE;
List<Runnable> block = new ArrayList<Runnable>(size);
try {
Channel key = ConsumerWorkService.this.workPool.nextWorkBlock(block, size);
if (key == null) return; // nothing ready to run
try {
for (Runnable runnable : block) {
runnable.run();
}
} finally {
if (ConsumerWorkService.this.workPool.finishWorkBlock(key)) {
ConsumerWorkService.this.executor.execute(new WorkPoolRunnable());
}
}
} catch (RuntimeException e) {
Thread.currentThread().interrupt();
}
}
}


/* Basic work selector and state transition step */
private K readyToInProgress() {
K key = this.ready.poll();
if (key != null) {
this.inProgress.add(key);
}
return key;
}


/**
* Return the next <i>ready</i> client,
* and transfer a collection of that client's items to process.
* Mark client <i>in progress</i>.
* If there is no <i>ready</i> client, return <code><b>null</b></code>.
* @param to collection object in which to transfer items
* @param size max number of items to transfer
* @return key of client to whom items belong, or <code><b>null</b></code> if there is none.
*/
public K nextWorkBlock(Collection<W> to, int size) {
synchronized (this) {
K nextKey = readyToInProgress();
if (nextKey != null) {
VariableLinkedBlockingQueue<W> queue = this.pool.get(nextKey);
drainTo(queue, to, size);
}
return nextKey;
}
}

技巧应该在ConsumerWorkService.this.workPool.nextWorkBlock中,它从就绪队列中轮询 channel ,并在运行回调run()后添加到完成 block 中的读取队列。如果我错了,请纠正我。

这很令人困惑,因为消费者绑定(bind)到一个 channel ,并且在最后一个任务 block 完成之前该 channel 不会释放到队列,这意味着线程池始终只为该消费者提供一个线程。

问题:

  1. 为什么 RabbitMQ 设计这个模型
  2. 我们如何优化这个问题
  3. 是否可以将任务提交到handleDelivery中的独立线程池来消费消息并确认(以确保仅在任务完成后才确认消息)

最佳答案

> 1. RabbitMQ 为什么设计这个模型

我自己也想知道原因。但这个事实清楚地反射(reflect)在他们的documentation中:

Each Channel has its own dispatch thread. For the most common use caseof one Consumer per Channel, this means Consumers do not hold up otherConsumers. If you have multiple Consumers per Channel be aware that along-running Consumer may hold up dispatch of callbacks to otherConsumers on that Channel.

> 2.如何优化这个问题

您可以拥有多个 channel ,也可以通过将实际工作提交到另一个线程池来将消息消耗处理分离。您可以在this article中找到更多详细信息.

> 3.是否可以将任务提交到handleDelivery中的独立线程池来消费消息和ack(以确保任务完成后才收到消息ack)

引自 docs :

When manual acknowledgements are used, it is important to considerwhat thread does the acknowledgement. If it's different from thethread that received the delivery (e.g. Consumer#handleDeliverydelegated delivery handling to a different thread), acknowledging withthe multiple parameter set to true is unsafe and will result indouble-acknowledgements, and therefore a channel-level protocolexception that closes the channel. Acknowledging a single message at atime can be safe.

关于java - 执行器服务 RabbitMQ 中只有一个线程同时运行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45428614/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com