gpt4 book ai didi

java - Rabbitmq 中的并发生产者无限期阻塞

转载 作者:行者123 更新时间:2023-12-01 11:29:23 26 4
gpt4 key购买 nike

我在使用 spring-rabbit-1.3.9.RELEASE 库对 Rabbitmq 3.3.5 进行 POC 时观察到奇怪的行为

当我启动单个生产线程时,一切运行顺利。但是,如果同时启动超过 1 个线程,则只有其中一个线程完成,所有其他线程都会无限期地阻塞,即使队列变空后也是如此。

当从 rabbitmqctl list_connections 进行监控时,被阻止线程的连接状态仍保持运行。应该注意的是,当生产者阻塞时或在整个运行期间的任何其他时间都不会发出警报。

我还观察到,如果我在每次发送后 hibernate 1 毫秒,问题就会消失。

所以,我有这些问题

  1. rabbitmq不支持并发生产者、高速率发布吗?
  2. 即使连接确实被阻止,为什么它没有显示在rabbitmqctl list_connections中?
  3. 为什么它们会无限期地阻塞并且无法恢复乳清队列变空?

代码

    public static void main(String[] argv) throws java.io.IOException, InterruptedException {
init();
PocConfig config = new PocConfig();
int threadCount = config.getThreadCount();
final int eventsPerThread = config.getEvents() / threadCount;
final long sleep = config.getSleep();

System.out.println("Start producer with configuration [threadCount=" + threadCount + ", events=" + eventsPerThread + ", sleep="
+ sleep + "]");

ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
for (int i = 0; i < threadCount; i++) {
final int threadId = i;
executorService.submit(new Runnable() {
public void run() {
produce(eventsPerThread, sleep, threadId);
}
});
}
waitAndTearDown(executorService);
}

private static void produce(int events, long sleep, int threadId) {
long start = System.currentTimeMillis();
for (int index = 1; index <= events; index++) {
try {
byte[] message = messageFactory.createTestMessage(index);
amqpTemplate.convertAndSend(QUEUE_NAME, message);
if (sleep > 0) {
Thread.sleep(sleep);
}
} catch (Exception e) {
LOG.error("Error", e);
}
}
long time = System.currentTimeMillis() - start;
System.out.println("Producer:" + threadId + " finished, events: " + events + ", Time(s): " + time / 1000 + ", tps: " + (events * 1000) / time);
}

Spring 配置

<bean id="connectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory">
<property name="addresses" value="${addresses}" />
<property name="username" value="${user}" />
<property name="password" value="${passwd}" />
<property name="cacheMode" value="CONNECTION" />
<property name="connectionCacheSize" value="${threads}" />
<property name="channelCacheSize" value="10" />
</bean>

<rabbit:template id="template" connection-factory="connectionFactory"
exchange="testExchange" routing-key="testQueue"/>

最佳答案

我想不出有什么会阻止的,所以我只是运行了你的测试;并且没有任何问题:

Start producer with configuration [threadCount=5, events=10, sleep=0]
Producer:2 finished, events: 1000, Time(s): 0, tps: 4405
Producer:3 finished, events: 1000, Time(s): 0, tps: 4132
Producer:1 finished, events: 1000, Time(s): 0, tps: 4048
Producer:0 finished, events: 1000, Time(s): 0, tps: 3968
Producer:4 finished, events: 1000, Time(s): 0, tps: 3952

是什么让您认为他们被屏蔽了?

进行线程转储(例如使用 jstack)来查看线程正在做什么。

编辑:

即使使用 1M 消息和 CacheMode CONNECTION,我仍然无法重现它...

Start producer with configuration [threadCount=5, events=200000, sleep=0]
Producer:0 finished, events: 200000, Time(s): 50, tps: 3959
Producer:3 finished, events: 200000, Time(s): 53, tps: 3746
Producer:1 finished, events: 200000, Time(s): 55, tps: 3635
Producer:2 finished, events: 200000, Time(s): 55, tps: 3634
Producer:4 finished, events: 200000, Time(s): 55, tps: 3629

确实看到队列进入flow模式(通过管理UI),但一切都恢复得很好。

我确实看到您的工作人员受到流量控制...

"pool-2-thread-3" prio=10 tid=0x00007f4af4849800 nid=0x65d5 runnable [0x00007f4ae082f000]
java.lang.Thread.State: RUNNABLE
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:113)

你在兔子日志中看到什么了吗?您在管理 UI 上看到的有关消息速率、状态等的内容是什么?

无论如何,这似乎与 Spring AMQP 没有任何关系;您需要联系rabbitmq-users Google 群组中的rabbitmq 人员。

(我正在使用rabbitmq 3.4.2进行测试)。

编辑2:

完全全新安装 3.5.2...

Start producer with configuration [threadCount=5, events=200000, sleep=0]
Producer:0 finished, events: 200000, Time(s): 39, tps: 5091
Producer:1 finished, events: 200000, Time(s): 39, tps: 5002
Producer:2 finished, events: 200000, Time(s): 40, tps: 4954
Producer:3 finished, events: 200000, Time(s): 40, tps: 4951
Producer:4 finished, events: 200000, Time(s): 40, tps: 4939

并且我在管理 UI 中没有看到 flow 状态(在队列上,但 channel /连接显示它们处于 flow 状态,但再次恢复)。

关于java - Rabbitmq 中的并发生产者无限期阻塞,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30544330/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com