gpt4 book ai didi

java - 如何提高 RabbitMQ 发送/接收速度

转载 作者:塔克拉玛干 更新时间:2023-11-02 08:18:50 29 4
gpt4 key购买 nike

我有一个使用 RabbitMQ 的项目。在最好的情况下,它每秒可以接收 3000 条消息。这是我的消费者代码:

package com.mdnaRabbit.worker;

import java.io.IOException;
import java.math.RoundingMode;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.QueueingConsumer;
import com.mdnaRabbit.worker.data.Data;
import org.apache.commons.lang.SerializationUtils;

public class App {

private static final String TASK_QUEUE_NAME = "task_queue";
private static int i = 0;
private static long timeStart;
private static long timeFinish;
private static long messPerSec;
public static void main (String[] argv) throws IOException,InterruptedException{

ExecutorService threader = Executors.newFixedThreadPool(20);
ConnectionFactory factory = new ConnectionFactory();

factory.setHost("localhost");

Connection connection = factory.newConnection(threader);
final Channel channel = connection.createChannel();

channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

channel.basicQos(50);

final QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(TASK_QUEUE_NAME, false, consumer);

timeStart = System.currentTimeMillis();

try {

while (i<100000) {

try {QueueingConsumer.Delivery delivery = consumer.nextDelivery();
Data mess = Data.fromBytes(delivery.getBody());

System.out.println(" [" + (i++) +"] Received " + mess.getHeader());

channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}catch (Exception e){
}
}
} catch (Exception e){
e.printStackTrace();
}

timeFinish = System.currentTimeMillis();
messPerSec = Math.round ((i*1000)/(timeFinish - timeStart));

System.out.println( "receives " + messPerSec + " per second");

channel.close();
connection.close();
}
}

如您所见,我正在使用 ExecutorService 和 channel.basicQos() 来提高速度,但这对我帮助不大。有没有办法提高接收/发送速度(发送速度的提高我认为和接收速度一样)

最佳答案

我以前没有使用过 RabbitMQ,但我可以分享我从其他消息传递中间件获得的经验。毕竟他们面临着同样的挑战。

通过调整您的 MQ 服务器,您可以增加每秒的消息数量,但您必须放弃某种功能,例如保证交货。如果您使用的是单个队列,则添加的线程越多,对队列锁的争用就越多。

我过去所做的并设法将性能提高了 x300 倍的是增加消息大小,即增加可以由单个线程自动完成的工作。我将尝试在下面描述我的算法(如果有人知道它的名字,我将不胜感激)。

  1. 创建一个接收消息的threadLocal集合
  2. 通过计时器线程提供对这些集合的(线程安全的)访问
  3. 设置时间和批量大小限制
  4. 传递消息时,检查大小是否已达到适当的批量大小。如果有,则刷新集合(即打包所有消息并一次性发送)
  5. 为了满足计时问题,让计时器线程定期检查从传递第一条消息时起耗时。如果它超过了你的阈值,那么就会像上面那样刷新集合。 (冲洗/检查时不要忘记同步)

(4)。您不仅消除了网络延迟,还消除了磁盘 IO 时间,因为普通 HDD 将花费相同的时间来写入 1 个字节和大约 1.5MB。

* 在我的例子中,这更复杂,因为每个交付线程都应该得到保证。为了支持这一点,您需要实现某种Barrier,以便传递线程将阻塞,直到主线程收到 ACK

** 您可能还想实现备份策略,以防一条消息失败,这意味着整个批处理都将失败。如果批处理失败,我建议作为单独的消息发送。

*** 在其他 MQ 中,有各种设置可能会阻碍您的性能。这些包括节流、将生产者限制在一定大小的队列之上、批处理消费者消息、多级别保证模式。 (请参阅下面的潜在组合)

**** 可能会降低性能的组合如下:队列的最大大小为 10MB,消费者预取 1000 条消息。为举例起见,假设每条消息的大小为 10K。此示例将导致(在某些 MQ 中)单个消费者线程获取所有消息(即使您有 100 个消费者线程)。另一方面,生产者将受到限制,不允许添加任何超过 10MB 限制的消息。这里的解决方案是增加队列 MaxSize 并减少预取大小。监视/分析/日志记录始终是您的 friend 。

希望这能让您更好地理解您的问题。 (顺便说一句,持久模式下 3000 条消息/秒还不错。)

关于java - 如何提高 RabbitMQ 发送/接收速度,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/15548444/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com