gpt4 book ai didi

java - 为什么我使用 Spring Integration/JtaTransactionManager 看到多个事务

转载 作者:太空宇宙 更新时间:2023-11-04 06:54:19 25 4
gpt4 key购买 nike

我正在开发一个使用 Spring Integration 框架和atomikos 进行分布式事务构建的项目。最近,我们一直在尝试运行集成测试,以验证消息是否正确通过我们的系统发送。在执行其中一项集成测试时,我注意到我们收到 10 条日志消息,指示正在创建新事务,并收到 10 条日志消息,指示事务提交。

每次消息从 channel 传递到端点时,Spring 是否都会创建新事务,反之亦然?

下面的代码接受消息驱动 channel 适配器上的消息(使用 transactionManager)并将其发送到路由器。然后,路由器将消息发送到包含转换器、服务激活器(带有 retryAdvice)和出站消息适配器的链。

据我所知,当我们的消息驱动 channel 适配器从队列接收到消息时,我应该看到创建一个事务,然后在完成处理消息时进行提交。所以我认为总共会有 3 笔交易。一种来自测试中发送消息,一种来自入站适配器,另一种来自测试中接收消息。

来自 spring-datasource.xml

<bean id="transactionManager"
class="org.springframework.transaction.jta.JtaTransactionManager">
<property name="transactionManager" ref="AtomikosTransactionManager" />
<property name="userTransaction" ref="AtomikosUserTransaction" />
</bean>

来自 spring-context.xml

<jms:message-driven-channel-adapter
id="inventoryQueue_inbound_adapter"
destination-name="queue.inventory"
channel="InventoryRouterChannel"
error-channel="inventoryErrorChannel"
transaction-manager="transactionManager"
acknowledge="transacted"/>

<integration:router input-channel="InventoryRouterChannel">
<bean class="com.inventory.InventoryRouter"/>
</integration:router>

<integration:chain input-channel="rxAddToCountWell">
<integration:json-to-object-transformer type="com.events.RxAddToCountWell"/>
<integration:service-activator ref="addToCountWellHandler" method="formatCountwellMessage">
<integration:request-handler-advice-chain>
<ref bean="retryAdvice"/>
</integration:request-handler-advice-chain>
</integration:service-activator>
<jms:outbound-channel-adapter destination-name="OUTBOUND.QUEUE"/>
</integration:chain>

<bean id="retryAdvice" class="org.springframework.integration.handler.advice.RequestHandlerRetryAdvice">
<property name="recoveryCallback">
<bean class="org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer">
<constructor-arg ref="inventoryErrorChannel"/>
</bean>
</property>
<property name="retryTemplate">
<bean class="org.springframework.retry.support.RetryTemplate">
<property name="retryPolicy">
<bean class="org.springframework.retry.policy.SimpleRetryPolicy">
<property name="maxAttempts" value="2"/>
</bean>
</property>
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="1000"/>
<property name="multiplier" value="2"/>
</bean>
</property>
</bean>
</property>
</bean>

消息端点

@MessageEndpoint
public class AddToCountWellHandler {

public static final Logger logger = Logger.getLogger(AddToCountWellHandler.class);

public Message<String> formatCountwellMessage(RxAddToCountWell payload) {
//our logic here
//...
return MessageBuilder.withPayload(temp).build();
}
}

测试方法

@Test
@DirtiesContext
public void addToCountWellIntegrationTest() throws InterruptedException, SystemException, NotSupportedException, HeuristicRollbackException, HeuristicMixedException, RollbackException {

// Send the message to the handler
transactionManager.getTransactionManager().begin();
jmsTemplate.send("queue.inventory", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage message;
try {
message = session.createTextMessage(getJson("./doc/event_json_examples/inventory/rxAddToCountWell.json"));
} catch (Exception e) {
//...
}
message.setJMSType("rxAddToCountWell");
return message;
}
});
transactionManager.getTransactionManager().commit();

transactionManager.getTransactionManager().begin();

//verify that it was placed on the queue
TextMessage output = (TextMessage) jmsTemplate.receive(COUNTWELL_QUEUE_NAME);
assertNotNull(output);

transactionManager.getTransactionManager().commit();
appContext.close();
}

日志

2014-04-08 13:55:35,018 [INFO ] org.springframework.transaction.jta.JtaTransactionManager - Using JTA UserTransaction: com.atomikos.icatch.jta.UserTransactionImp@188c838 (org.springframework.transaction.jta.JtaTransactionManager.checkUserTransactionAndTransactionManager(JtaTransactionManager.java:471))
2014-04-08 13:55:35,019 [INFO ] org.springframework.transaction.jta.JtaTransactionManager - Using JTA TransactionManager: com.atomikos.icatch.jta.UserTransactionManager@111089b (org.springframework.transaction.jta.JtaTransactionManager.checkUserTransactionAndTransactionManager(JtaTransactionManager.java:482))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,171 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,172 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#3]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:36,171 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:37,619 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:37,631 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#2]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:38,656 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:38,658 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:39,667 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:39,670 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#4]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:40,705 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:40,706 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#1]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:40,748 [DEBUG] com.inventory.InventoryRouter - Routing jms_type: rxAddToCountWell (com.inventory.InventoryRouter.route(InventoryRouter.java:25))
2014-04-08 13:55:40,834 [DEBUG] org.springframework.retry.support.RetryTemplate - Retry: count=0 (org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:251))
2014-04-08 13:55:40,835 [DEBUG] com.inventory.AddToCountWellHandler - Entered formatCountwellMessage... (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:31))
2014-04-08 13:55:41,179 [DEBUG] com.inventory.AddToCountWellHandler - Leaving formatCountwellMessage (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:38))
2014-04-08 13:55:41,190 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:41,193 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#3]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 13:55:43,396 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:44,406 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:45,422 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:46,448 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 13:55:47,453 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))

最佳答案

我已经确定发生了什么。当我加载测试上下文配置时,我正在加载一个导入所有 5 个消息驱动 channel 适配器的文件。正如加里所说,每个空闲线程必须启动一个事务。因此,所有适配器都启动了事务,但只有一个适配器实际收到了消息。

我发现修改测试上下文以仅加载特定适配器/路由会导致仅使用单个线程。

2014-04-08 17:16:38,322 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 17:16:38,492 [DEBUG] com.inventory.InventoryRouter - Routing jms_type: rxAddToCountWell (com.inventory.InventoryRouter.route(InventoryRouter.java:25))
2014-04-08 17:16:38,555 [DEBUG] org.springframework.retry.support.RetryTemplate - Retry: count=0 (org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:251))
2014-04-08 17:16:38,595 [DEBUG] com.inventory.AddToCountWellHandler - Entered formatCountwellMessage... (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:31))
2014-04-08 17:16:42,198 [DEBUG] com.inventory.AddToCountWellHandler - Leaving formatCountwellMessage (com.inventory.AddToCountWellHandler.formatCountwellMessage(AddToCountWellHandler.java:38))
2014-04-08 17:16:42,205 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))
2014-04-08 17:16:42,210 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Creating new transaction with name [org.springframework.jms.listener.DefaultMessageListenerContainer#0]: PROPAGATION_REQUIRED,ISOLATION_DEFAULT (org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:366))
2014-04-08 17:16:43,219 [DEBUG] org.springframework.transaction.jta.JtaTransactionManager - Initiating transaction commit (org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:753))

关于java - 为什么我使用 Spring Integration/JtaTransactionManager 看到多个事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22946521/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com