gpt4 book ai didi

java - 无法使用 JmsTemplate 发送到 XA JMS 队列

转载 作者:行者123 更新时间:2023-11-30 06:00:48 29 4
gpt4 key购买 nike

我的 SpringBoot 应用程序从数据库读取一些数据并将这些对象写入 jms 队列。我使用 XA 进行事务处理。但是当我将对象发送到队列时,出现以下异常:

2018-09-13 13:44:46 ERROR c.s.c.m2m.MachineMessageProcessor [] - Processing of messages failed
org.springframework.jms.UncategorizedJmsException: Uncategorized exception occurred during JMS processing; nested exception is com.atomikos.jms.AtomikosTransactionRequiredJMSException: The JMS session you are using requires a JTA transaction context for the calling thread and none was found.
Please correct your code to do one of the following:
1. start a JTA transaction if you want your JMS operations to be subject to JTA commit/rollback, or
2. increase the maxPoolSize of the AtomikosConnectionFactoryBean to avoid transaction timeout while waiting for a connection, or
3. create a non-transacted session and do session acknowledgment yourself, or
4. set localTransactionMode to true so connection-level commit/rollback are enabled.
at org.springframework.jms.support.JmsUtils.convertJmsAccessException(JmsUtils.java:316)
at org.springframework.jms.support.JmsAccessor.convertJmsAccessException(JmsAccessor.java:169)
at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:487)
at org.springframework.jms.core.JmsTemplate.send(JmsTemplate.java:570)
at org.springframework.jms.core.JmsTemplate.convertAndSend(JmsTemplate.java:696)
at com.schaerer.coffeelink.m2m.MachineStatusReader.processMessages(MachineStatusReader.java:62)
at com.schaerer.coffeelink.m2m.BatchedMessageProcessor.processPage(BatchedMessageProcessor.java:70)
at com.schaerer.coffeelink.m2m.BatchedMessageProcessor.access$000(BatchedMessageProcessor.java:22)
at com.schaerer.coffeelink.m2m.BatchedMessageProcessor$1.doInTransactionWithoutResult(BatchedMessageProcessor.java:57)
at org.springframework.transaction.support.TransactionCallbackWithoutResult.doInTransaction(TransactionCallbackWithoutResult.java:34)
at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
at com.schaerer.coffeelink.m2m.MachineMessageProcessor.executeInNewTransaction(MachineMessageProcessor.java:146)
at com.schaerer.coffeelink.m2m.BatchedMessageProcessor.processPageInTransaction(BatchedMessageProcessor.java:53)
at com.schaerer.coffeelink.m2m.BatchedMessageProcessor.execute(BatchedMessageProcessor.java:45)
at com.schaerer.coffeelink.m2m.DataReaderScheduler.runDataReader(DataReaderScheduler.java:31)
at com.schaerer.coffeelink.m2m.DataReaderScheduler$$FastClassBySpringCGLIB$$56ffc69e.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:720)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:157)
at org.springframework.aop.aspectj.MethodInvocationProceedingJoinPoint.proceed(MethodInvocationProceedingJoinPoint.java:85)
at com.schaerer.coffeelink.common.security.api.authorization.RunAsRoleAspect.aroundwithResourceAccessPermissionAnnotation(RunAsRoleAspect.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethodWithGivenArgs(AbstractAspectJAdvice.java:629)
at org.springframework.aop.aspectj.AbstractAspectJAdvice.invokeAdviceMethod(AbstractAspectJAdvice.java:618)
at org.springframework.aop.aspectj.AspectJAroundAdvice.invoke(AspectJAroundAdvice.java:70)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:168)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179)
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:655)
at com.schaerer.coffeelink.m2m.DataReaderScheduler$$EnhancerBySpringCGLIB$$a8c5dee8.runDataReader(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:65)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

JMS 配置如下:

@Configuration
@EnableJms
@EnableAutoConfiguration
@EnableTransactionManagement
public class JmsConfiguration {

@Value("${messaging.broker-url}")
private String brokerUrl;

@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory(DefaultJmsListenerContainerFactoryConfigurer configurer,
ConnectionFactory jmsConnectionFactory,
PlatformTransactionManager platformTransactionManager) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
configurer.configure(factory, jmsConnectionFactory);
factory.setTransactionManager(platformTransactionManager);
factory.setSessionTransacted(true);
factory.setSessionAcknowledgeMode(Session.DUPS_OK_ACKNOWLEDGE);
return factory;
}

@Bean
public ConnectionFactory jmsConnectionFactory() {
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory(brokerUrl);
activeMQXAConnectionFactory.setTrustAllPackages(true);

AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
return atomikosConnectionFactoryBean;
}

@Bean
public JmsTemplate JmsTemplate(ConnectionFactory jmsConnectionFactory) {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(jmsConnectionFactory);
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}

@Bean
public PlatformTransactionManager platformTransactionManager(UserTransactionManager atomikosUserTransactionManager,
UserTransaction atomikosUserTransaction) {
JtaTransactionManager manager = new JtaTransactionManager();
manager.setTransactionManager(atomikosUserTransactionManager);
manager.setUserTransaction(atomikosUserTransaction);
manager.setAllowCustomIsolationLevels(true);
return manager;
}

@Bean
UserTransactionManager atomikosUserTransactionManager() {
UserTransactionManager manager = new UserTransactionManager();
manager.setStartupTransactionService(true);
manager.setForceShutdown(false);
return manager;
}

@Bean
UserTransaction atomikosUserTransaction() {
return new UserTransactionImp();
}
}

调用代码为:

@Transactional
private void processMessages(List<MachineStatus> machineStatuses) {
if (machineStatuses != null && !machineStatuses.isEmpty()) {
jmsAdapter.convertAndSend(MACHINE_STATUS_QUEUE, machineStatuses, message -> {
message.setJMSCorrelationID(UUID.randomUUID().toString());
return message;
});
setLastId(machineStatuses.get(machineStatuses.size() - 1).getId());
}
}

有人知道我做错了什么吗?

最佳答案

您正在创建一次 jmsConnectionFactory。通过你的@Beans注释第二次而不是从 jmsTemplate 自动连接它:

@Bean
public JmsTemplate JmsTemplate(ConnectionFactory jmsConnectionFactory) {
// initialization code here
}

您正在第二次手动创建它:

jmsTemplate.setConnectionFactory(jmsConnectionFactory());

同样的情况

jmsListenerContainerFactory

不要多次创建,而是使用注入(inject)。

更新:您不能在私有(private)方法上使用 @Transactional 注释。将您的注释放在公共(public)监听器接口(interface)上。

关于java - 无法使用 JmsTemplate 发送到 XA JMS 队列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52315341/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com