gpt4 book ai didi

java - 每分钟发送 35000 条 jms 消息

转载 作者:行者123 更新时间:2023-11-29 04:08:41 25 4
gpt4 key购买 nike

我们有一个 spring boot 应用程序,用于对另一个组件执行负载测试。我们每分钟最多需要发送 35000 条 JMS 消息,因此我使用调度程序每分钟运行一次任务。

问题是当我保持低强度时,它设法在指定的时间间隔(一分钟)内发送消息。但是当强度很高时,发送大量消息需要超过 1 分钟的时间。对以下实现有何建议?

调度器类

@Component
public class MessageScheduler {

private final Logger log = LoggerFactory.getLogger(getClass());
private static ScheduledExecutorService executorService = Executors.newScheduledThreadPool(16);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);

@Autowired
JmsSender sender;

public void startScheduler() {
Runnable runnableTask = sender::sendMessagesChunk;
executorService.scheduleAtFixedRate(runnableTask, 0, TIME_PERIOD,
TimeUnit.MILLISECONDS);
}
}

发送消息的类

@Component
public class JmsSender {

@Autowired
TrackingManager manager;

private final Logger log = LoggerFactory.getLogger(getClass());
private final static int TOTAL_MESSAGES = ConfigFactory.getConfig().getInt("total.tracking.messages").orElse(10);
private final static int TIME_PERIOD = ConfigFactory.getConfig().getInt("messages.period").orElse(60000);
private static int failedPerPeriod=0;
private static int totalFailed=0;
private static int totalMessageCounter=0;

public void sendMessagesChunk() {
log.info("Started at: {}", Instant.now());
log.info("Sending messages with intensity {} messages/minute", TOTAL_MESSAGES);
for (int i=0; i<TOTAL_MESSAGES; i++) {
try {
long start = System.currentTimeMillis();
MessageDTO msg = manager.createMessage();
send(msg);
long stop = System.currentTimeMillis();
if (timeOfDelay(stop-start)>=0L) {
Thread.sleep(timeOfDelay(stop-start));
}
} catch (Exception e) {
log.info("Error : " + e.getMessage());
failedPerPeriod++;
}
}
totalMessageCounter += TOTAL_MESSAGES;
totalFailed += failedPerPeriod;
log.info("Finished at: {}", Instant.now());
log.info("Success rate(of last minute): {} %, Succeeded: {}, Failed: {}, Success rate(in total): {} %, Succeeded: {}, Failed: {}"
,getSuccessRatePerPeriod(), getSuccededPerPeriod(), failedPerPeriod,
getTotalSuccessRate(), getTotalSucceded(), totalFailed);
failedPerPeriod =0;
}

private long timeOfDelay(Long elapsedTime){
return (TIME_PERIOD / TOTAL_MESSAGES) - elapsedTime;
}
private int getSuccededPerPeriod(){
return TOTAL_MESSAGES - failedPerPeriod;
}

private int getTotalSucceded(){
return totalMessageCounter - totalFailed;
}

private double getSuccessRatePerPeriod(){
return getSuccededPerPeriod()*100D / TOTAL_MESSAGES;
}

private double getTotalSuccessRate(){
return getTotalSucceded()*100D / totalMessageCounter;
}

private void send(MessageDTO messageDTO) throws Exception {
requestContextInitializator();
JmsClient client = JmsClientBuilder.newClient(UriScheme.JmsType.AMQ);
client.target(new URI("activemq:queue:" + messageDTO.getDestination()))
.msgTypeVersion(messageDTO.getMsgType(), messageDTO.getVersion())
.header(Header.MSG_VERSION, messageDTO.getVersion())
.header(Header.MSG_TYPE, messageDTO.getMsgType())
.header(Header.TRACKING_ID, UUID.randomUUID().toString())
.header(Header.CLIENT_ID, "TrackingJmsClient")
.post(messageDTO.getPayload());
}

最佳答案

你应该解决两个问题:

  1. 总发送操作时间必须低于最大时间。
  2. 不应尽可能快地发送消息,而是应在所有可用时间统一发送。

显然,如果您的send 方法太慢,将会超过最大时间。

发送消息的更快方法是使用某种批量操作。没关系,如果您的 MQ API 不支持批量操作,您将无法使用它!因为第二个限制(“统一”)。

您可以异步发送消息,但如果您的 MQ API 为此创建线程而不是“非阻塞”异步,您可能会遇到内存问题。

使用 javax.jms.MessageProducer.send 可以异步发送消息,但是会为每个消息创建一个新线程(会创建大量内存和服务器线程)。

另一个加速可以是只创建一个 JMS 客户端(您的 send 方法)。

要实现第二个要求,您应该修复您的timeOfDelay 函数,这是错误的。实际上,您应该考虑 send 函数的概率分布来估计适当的值,但是,您可以简单地执行以下操作:

    long accTime = 0L;
for (int i=0; i<TOTAL_MESSAGES; i++) {
try {
long start = System.currentTimeMillis();
MessageDTO msg = manager.createMessage();
send(msg);
long stop = System.currentTimeMillis();
accTime += stop - start;
if(accTime < TIME_PERIOD)
Thread.sleep((TIME_PERIOD - accTime) / (TOTAL_MESSAGES - i));
} catch (Exception e) {
log.info("Error : " + e.getMessage());
failedPerPeriod++;
}
}

关于java - 每分钟发送 35000 条 jms 消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56533029/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com