gpt4 book ai didi

java - Spring批量写入ActiveMQ

转载 作者:行者123 更新时间:2023-11-30 06:42:02 26 4
gpt4 key购买 nike

我尝试将消息写入 JMS 队列,我将在下一步中取出该消息以写入数据库。第一部分应该是同步的,第二部分应该是异步的。 JMS 部分非常慢(1 分钟内有 1100 个项目进入队列)。

这就是我的工作。

@Bean
public Job multiThreadedStepJob() {
Flow flow1 = new FlowBuilder<Flow>("subflow1").from(step()).end();
Flow flow2 = new FlowBuilder<Flow>("subflow2").from(step2()).end();
Flow splitFlow = new FlowBuilder<Flow>("splitflow")
.split(new SimpleAsyncTaskExecutor()).add(flow1, flow2) .build();

return jobBuilders.get("multiThreadedStepJob")
.start(splitFlow).end().build();

}

第一步:

@Bean
public Step step() {
return stepBuilders.get("step")
.<OrderDTO, OrderDTO>chunk(CHUNK_SIZE)
.reader(reader())
.writer(writer())
.build();
}

第二步:

@Bean
public Step step2() {
return stepBuilders.get("step2")
.<OrderDTO, OrderDTO>chunk(100)
.reader(reader2())
.writer(writer2())
.build();
}

我认为我的错误出现在步骤的编写器和步骤2的读取器中,因为我可以一起运行其他读取器和写入器,并且没有任何问题。

@Bean
public JmsItemWriter<OrderDTO> writer() {
JmsItemWriter<OrderDTO> itemWriter = new JmsItemWriter<>();
itemWriter.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
return itemWriter;
}

@Bean
public JmsItemReader<OrderDTO> reader2() {
JmsItemReader<OrderDTO> itemReader = new JmsItemReader<>();
itemReader.setJmsTemplate(infrastructureConfiguration.jmsTemplate());
itemReader.setItemType(OrderDTO.class);
return itemReader;
}

它们使用相同的 JmsTemplate 来连接到队列:

@Bean
public JmsTemplate jmsTemplate() {
JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory());
jmsTemplate.setDefaultDestination(queue());
jmsTemplate.setReceiveTimeout(500);
return jmsTemplate;
}

@Bean
public Queue queue() {
return new ActiveMQQueue("orderList");
}

@Bean
public ConnectionFactory connectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(BROKER_URL);
factory.setTrustAllPackages(true);

ActiveMQPrefetchPolicy prefetchPolicy = new ActiveMQPrefetchPolicy();
prefetchPolicy.setQueuePrefetch(30);

factory.setPrefetchPolicy(prefetchPolicy);

PooledConnectionFactory pool = new PooledConnectionFactory(factory);
pool.setMaxConnections(10);
pool.setMaximumActiveSessionPerConnection(10);
pool.isCreateConnectionOnStartup();

return pool;
}

我使用的其余配置是来自@EnableBatchProcessing 的配置。有谁知道为什么进展这么慢?

最佳答案

显然是jmsTemplate.setSessionTransacted(true);真的很重要。这大大加快了 JMS 队列的写入和读取速度。由于某种原因,我认为默认值是正确的,因为我正在处理批处理。

无论如何,如果其他人遇到此问题,请先检查此问题,因为很容易忘记。

关于java - Spring批量写入ActiveMQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44242472/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com