作者热门文章
- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我正在尝试在 Apache TomEE 中实现分布式事务。换言之,流程是:
操作 1、2 和 3 都是由 TomEE 控制的同一 XA 事务的一部分。因此,在任何情况下,他们要么全部失败,要么全部成功。
tomee.xml
<?xml version="1.0" encoding="UTF-8"?>
<tomee>
this resource adapter is just necessary to tell tomee to not start internal ActiveMq instance
<Resource id="MyAdapter" type="ActiveMQResourceAdapter">
BrokerXmlConfig
ServerUrl tcp://fakehost:666
</Resource>
<Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>
<Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>
<Resource id="jms/MyOutgoingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
PhysicalName MY_OUTGOING_QUEUE
</Resource>
<Resource id="jms/MyIncomingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
PhysicalName MY_INCOMING_QUEUE
</Resource>
<Resource id="jdbc/myDBXAPooled" type="DataSource">
XaDataSource myDBXA
DataSourceCreator dbcp
JtaManaged true
UserName TestUser
Password TestPassword
MaxWait 2000
ValidationQuery SELECT 1
MaxActive 15
</Resource>
<Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test
User TestUser
Password TestPassword
</Resource>
</tomee>
Springconfig.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">
<!-- <jee:jndi-lookup jndi-name="myDBXAPooled" id="myDatasource" resource-ref="true" /> -->
<jee:jndi-lookup jndi-name="jms/MyOutgoingConnFactory" id="myOutgoingConnFactory" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyIncomingConnFactory" id="myIncomingConnFactory" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyOutgoingQueue" id="myOutgoingQueue" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyIncomingQueue" id="myIncomingQueue" resource-ref="true" />
<jee:jndi-lookup jndi-name="jdbc/myDBXAPooled" id="myDatasource" resource-ref="true" />
<tx:jta-transaction-manager/>
<!-- <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> -->
<!-- the previous two ways of getting the transactionManager seems equivalent and both get Geronimo -->
</beans>
SpringConfig.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">
<bean id="messageListener" class="com.test.MyListener">
<property name="connectionFactory" ref="myIncomingConnFactory" />
<property name="destination" ref="myIncomingQueue" />
<!-- <property name="sessionTransacted" value="true" /> -->
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="6" />
<property name="messageListener" ref="myMessageProcessor" />
<property name="transactionManager" ref="transactionManager" />
<property name="taskExecutor" ref="msgListenersTaskExecutor" />
</bean>
<bean id="myMessageProcessor" class="com.test.MyMessageReceiver">
<property name="forwardConnectionFactory" ref="myOutgoingConnFactory" />
<property name="forwardQueue" ref="myOutgoingQueue" />
<property name="datasource" ref="myDatasource" />
</bean>
<bean id="msgListenersTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"/>
</beans>
MyMessageReceiver.java:
package com.test;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;
import org.apache.log4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;
public class MyMessageReceiver implements MessageListener {
static Logger log = Logger.getLogger(MyMessageReceiver.class);
private ConnectionFactory forwardConnectionFactory;
private Queue forwardQueue;
private DataSource datasource;
public void setForwardConnectionFactory(ConnectionFactory connFactory) {
forwardConnectionFactory=connFactory;
}
public void setforwardQueue(Queue queue) {
forwardQueue=queue;
}
public void setDatasource(DataSource ds) {
datasource=ds;
}
@Override
@Transactional(propagation=Propagation.REQUIRED)
public void onMessage(Message message) {
log.info("************************************");
MyListener listener = (MyListener)SpringContext.getBean("messageListener");
listener.printInfo();
log.info("************************************");
TextMessage msg = (TextMessage) message;
String text = null;
try {
text = msg.getText();
if (text != null) log.info("MESSAGE RECEIVED: "+ text);
updateDB(text); // function call to update DB
sendMsg(text); // function call to publish messages to queue
System.out.println("****************Rollback");
// Throwing exception to rollback DB, Message should not be
// published and consumed message sent to a DLQ
//(Broker side DLQ configuration already done)
throw new RuntimeException();
//if (text!=null && text.indexOf("rollback")!=-1) throw new RuntimeException("Message content includes the word rollback");
} catch (Exception e) {
log.error("Rolling back the entire XA transaction");
log.error(e.getMessage());
throw new RuntimeException("Rolled back because of "+e.getMessage());
}
}
private void updateDB(String text) throws Exception {
Connection conn = null;
PreparedStatement ps = null;
try {
System.out.println("*******datasource "+datasource);
conn = datasource.getConnection();
System.out.println("*******conn "+conn.getMetaData().getUserName());
if (conn!=null) {
System.out.println("*******conn "+conn.getMetaData().getUserName());
ps = conn.prepareStatement("INSERT INTO MY_TABLE (name) VALUES(?)");
ps.setString(1, text);
ps.executeUpdate();
}
} catch (Exception e) {
throw e;
} finally {
if (ps!=null) {
try {
ps.close();
} catch (SQLException e) {
log.error(e.getMessage());
// do nothing
}
}
if (conn!=null) {
try {
conn.close();
} catch (SQLException e) {
log.error(e.getMessage());
// do nothing
}
}
}
}
private void sendMsg(String msgToBeSent) throws Exception {
javax.jms.Connection conn = null;
Session session = null;
try {
System.out.println("*************forwardConnectionFactory"+forwardConnectionFactory);
conn = forwardConnectionFactory.createConnection();
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(forwardQueue);
TextMessage msg = session.createTextMessage(msgToBeSent);
messageProducer.send(msg);
} catch (Exception e) {
throw e;
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// do nothing
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
// do nothing
}
}
}
}
}
我的监听器.java:
package com.test;
import javax.transaction.Status;
import javax.transaction.SystemException;
import org.apache.log4j.Logger;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.jta.JtaTransactionManager;
public class MyListener extends DefaultMessageListenerContainer {
static Logger log = Logger.getLogger(MyListener.class);
public void printInfo() {
try {
log.info("trans manager="+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager()+","+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().getStatus()+", this.isSessionTransacted()="+this.isSessionTransacted());
log.info("STATUS_ACTIVE="+Status.STATUS_ACTIVE);
log.info("STATUS_COMMITTEDE="+Status.STATUS_COMMITTED);
log.info("STATUS_COMMITTING="+Status.STATUS_COMMITTING);
log.info("STATUS_MARKED_ROLLBACK="+Status.STATUS_MARKED_ROLLBACK);
log.info("STATUS_NO_TRANSACTION="+Status.STATUS_NO_TRANSACTION);
log.info("STATUS_PREPARED="+Status.STATUS_PREPARED);
log.info("STATUS_PREPARING="+Status.STATUS_PREPARING);
log.info("STATUS_ROLLEDBACK="+Status.STATUS_ROLLEDBACK);
log.info("STATUS_ROLLING_BACK="+Status.STATUS_ROLLING_BACK);
log.info("STATUS_UNKNOWN="+Status.STATUS_UNKNOWN);
} catch (SystemException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void forceRollback() {
try {
((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().setRollbackOnly();
} catch (IllegalStateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SystemException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
更新数据库并将消息发送到传出队列后,我故意抛出一个 RuntimeException
来测试数据库和消息代理的事务回滚。
这三个操作都在成功的情况下提交,但它只在失败的情况下回滚数据库操作,而两个 JMS 操作无论如何都会提交。
它可能是:
我已经花了很多时间与这件事作斗争并寻找可能的解决方案。
很高兴听到您对此的意见,如果结果是我这边的错误,再次深表歉意。
最佳答案
我相信您需要使用 ActiveMQ JCA 资源适配器来确保连接自动登记到 XA 事务中。试试这个:
<tomee>
<Resource id="MyJmsResourceAdapter" type="ActiveMQResourceAdapter">
# Do not start the embedded ActiveMQ broker
BrokerXmlConfig =
ServerUrl = tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>
<Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory">
resourceAdapter = MyJmsResourceAdapter
transactionSupport = xa
</Resource>
<Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory">
resourceAdapter = MyJmsResourceAdapter
transactionSupport = xa
</Resource>
<Resource id="jms/MyOutgoingQueue" type="javax.jms.Queue"/>
<Resource id="jms/MyIncomingQueue" type="javax.jms.Queue"/>
<Resource id="jdbc/myDBXAPooled" type="DataSource">
XaDataSource myDBXA
DataSourceCreator dbcp
JtaManaged true
UserName TestUser
Password TestPassword
MaxWait 2000
ValidationQuery SELECT 1
MaxActive 15
</Resource>
<Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test
User TestUser
Password TestPassword
</Resource>
</tomee>
关于activemq - Apache TomEE 外部 ActiveMQ 资源未在分布式事务中回滚,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60627955/
我是一名优秀的程序员,十分优秀!