gpt4 book ai didi

activemq - Apache TomEE 外部 ActiveMQ 资源未在分布式事务中回滚

转载 作者:行者123 更新时间:2023-12-04 17:30:05 29 4
gpt4 key购买 nike

我正在尝试在 Apache TomEE 中实现分布式事务。换言之,流程是:

  • 消息读取器(即消息驱动 bean)从队列 (1) 中读取并处理一个消息触发:
    • 一个或多个数据库的插入/更新 (2)
    • 向另一个队列发送消息 (3)

操作 1、2 和 3 都是由 TomEE 控制的同一 XA 事务的一部分。因此,在任何情况下,他们要么全部失败,要么全部成功。

tomee.xml

<?xml version="1.0" encoding="UTF-8"?>
<tomee>
this resource adapter is just necessary to tell tomee to not start internal ActiveMq instance
<Resource id="MyAdapter" type="ActiveMQResourceAdapter">
BrokerXmlConfig
ServerUrl tcp://fakehost:666
</Resource>

<Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>

<Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory" class-name="org.apache.activemq.ActiveMQXAConnectionFactory">
BrokerURL tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>

<Resource id="jms/MyOutgoingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
PhysicalName MY_OUTGOING_QUEUE
</Resource>

<Resource id="jms/MyIncomingQueue" class-name="org.apache.activemq.command.ActiveMQQueue">
PhysicalName MY_INCOMING_QUEUE
</Resource>

<Resource id="jdbc/myDBXAPooled" type="DataSource">
XaDataSource myDBXA
DataSourceCreator dbcp
JtaManaged true
UserName TestUser
Password TestPassword
MaxWait 2000
ValidationQuery SELECT 1
MaxActive 15
</Resource>

<Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test
User TestUser
Password TestPassword
</Resource>
</tomee>

Springconfig.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">


<!-- <jee:jndi-lookup jndi-name="myDBXAPooled" id="myDatasource" resource-ref="true" /> -->
<jee:jndi-lookup jndi-name="jms/MyOutgoingConnFactory" id="myOutgoingConnFactory" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyIncomingConnFactory" id="myIncomingConnFactory" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyOutgoingQueue" id="myOutgoingQueue" resource-ref="true" />
<jee:jndi-lookup jndi-name="jms/MyIncomingQueue" id="myIncomingQueue" resource-ref="true" />
<jee:jndi-lookup jndi-name="jdbc/myDBXAPooled" id="myDatasource" resource-ref="true" />

<tx:jta-transaction-manager/>
<!-- <bean id="transactionManager" class="org.springframework.transaction.jta.JtaTransactionManager"/> -->
<!-- the previous two ways of getting the transactionManager seems equivalent and both get Geronimo -->


</beans>

SpringConfig.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jee="http://www.springframework.org/schema/jee"
xmlns:tx="http://www.springframework.org/schema/tx"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.2.xsd
http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.2.xsd
http://www.springframework.org/schema/jee http://www.springframework.org/schema/jee/spring-jee-4.2.xsd">


<bean id="messageListener" class="com.test.MyListener">
<property name="connectionFactory" ref="myIncomingConnFactory" />
<property name="destination" ref="myIncomingQueue" />
<!-- <property name="sessionTransacted" value="true" /> -->
<property name="concurrentConsumers" value="1" />
<property name="maxConcurrentConsumers" value="6" />
<property name="messageListener" ref="myMessageProcessor" />
<property name="transactionManager" ref="transactionManager" />
<property name="taskExecutor" ref="msgListenersTaskExecutor" />
</bean>

<bean id="myMessageProcessor" class="com.test.MyMessageReceiver">
<property name="forwardConnectionFactory" ref="myOutgoingConnFactory" />
<property name="forwardQueue" ref="myOutgoingQueue" />
<property name="datasource" ref="myDatasource" />
</bean>

<bean id="msgListenersTaskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"/>



</beans>

MyMessageReceiver.java:

package com.test;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.sql.DataSource;

import org.apache.log4j.Logger;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

public class MyMessageReceiver implements MessageListener {

static Logger log = Logger.getLogger(MyMessageReceiver.class);

private ConnectionFactory forwardConnectionFactory;
private Queue forwardQueue;
private DataSource datasource;

public void setForwardConnectionFactory(ConnectionFactory connFactory) {
forwardConnectionFactory=connFactory;
}
public void setforwardQueue(Queue queue) {
forwardQueue=queue;
}
public void setDatasource(DataSource ds) {
datasource=ds;
}

@Override
@Transactional(propagation=Propagation.REQUIRED)
public void onMessage(Message message) {

log.info("************************************");
MyListener listener = (MyListener)SpringContext.getBean("messageListener");
listener.printInfo();
log.info("************************************");

TextMessage msg = (TextMessage) message;
String text = null;
try {
text = msg.getText();

if (text != null) log.info("MESSAGE RECEIVED: "+ text);

updateDB(text); // function call to update DB

sendMsg(text); // function call to publish messages to queue

System.out.println("****************Rollback");
// Throwing exception to rollback DB, Message should not be
// published and consumed message sent to a DLQ
//(Broker side DLQ configuration already done)
throw new RuntimeException();
//if (text!=null && text.indexOf("rollback")!=-1) throw new RuntimeException("Message content includes the word rollback");

} catch (Exception e) {
log.error("Rolling back the entire XA transaction");
log.error(e.getMessage());
throw new RuntimeException("Rolled back because of "+e.getMessage());
}

}

private void updateDB(String text) throws Exception {

Connection conn = null;
PreparedStatement ps = null;
try {
System.out.println("*******datasource "+datasource);
conn = datasource.getConnection();
System.out.println("*******conn "+conn.getMetaData().getUserName());
if (conn!=null) {
System.out.println("*******conn "+conn.getMetaData().getUserName());
ps = conn.prepareStatement("INSERT INTO MY_TABLE (name) VALUES(?)");
ps.setString(1, text);
ps.executeUpdate();
}
} catch (Exception e) {
throw e;
} finally {
if (ps!=null) {
try {
ps.close();
} catch (SQLException e) {
log.error(e.getMessage());
// do nothing
}
}
if (conn!=null) {
try {
conn.close();
} catch (SQLException e) {
log.error(e.getMessage());
// do nothing
}
}
}

}

private void sendMsg(String msgToBeSent) throws Exception {

javax.jms.Connection conn = null;
Session session = null;
try {
System.out.println("*************forwardConnectionFactory"+forwardConnectionFactory);
conn = forwardConnectionFactory.createConnection();
session = conn.createSession(true, Session.AUTO_ACKNOWLEDGE);
MessageProducer messageProducer = session.createProducer(forwardQueue);
TextMessage msg = session.createTextMessage(msgToBeSent);
messageProducer.send(msg);

} catch (Exception e) {
throw e;
} finally {
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// do nothing
}
}
if (conn != null) {
try {
conn.close();
} catch (JMSException e) {
// do nothing
}
}
}
}

}

我的监听器.java:

package com.test;

import javax.transaction.Status;
import javax.transaction.SystemException;

import org.apache.log4j.Logger;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.jta.JtaTransactionManager;

public class MyListener extends DefaultMessageListenerContainer {

static Logger log = Logger.getLogger(MyListener.class);

public void printInfo() {

try {

log.info("trans manager="+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager()+","+((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().getStatus()+", this.isSessionTransacted()="+this.isSessionTransacted());
log.info("STATUS_ACTIVE="+Status.STATUS_ACTIVE);
log.info("STATUS_COMMITTEDE="+Status.STATUS_COMMITTED);
log.info("STATUS_COMMITTING="+Status.STATUS_COMMITTING);
log.info("STATUS_MARKED_ROLLBACK="+Status.STATUS_MARKED_ROLLBACK);
log.info("STATUS_NO_TRANSACTION="+Status.STATUS_NO_TRANSACTION);
log.info("STATUS_PREPARED="+Status.STATUS_PREPARED);
log.info("STATUS_PREPARING="+Status.STATUS_PREPARING);
log.info("STATUS_ROLLEDBACK="+Status.STATUS_ROLLEDBACK);
log.info("STATUS_ROLLING_BACK="+Status.STATUS_ROLLING_BACK);
log.info("STATUS_UNKNOWN="+Status.STATUS_UNKNOWN);



} catch (SystemException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}

}

public void forceRollback() {
try {
((JtaTransactionManager)this.getTransactionManager()).getTransactionManager().setRollbackOnly();
} catch (IllegalStateException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SecurityException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (SystemException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}

}

更新数据库并将消息发送到传出队列后,我故意抛出一个 RuntimeException 来测试数据库和消息代理的事务回滚。

这三个操作都在成功的情况下提交,但它只在失败的情况下回滚数据库操作,而两个 JMS 操作无论如何都会提交。

它可能是:

  • 我的 tomee.xml(尤其是队列连接工厂)或其他地方的设置错误
  • 错误??

我已经花了很多时间与这件事作斗争并寻找可能的解决方案。

很高兴听到您对此的意见,如果结果是我这边的错误,再次深表歉意。

最佳答案

我相信您需要使用 ActiveMQ JCA 资源适配器来确保连接自动登记到 XA 事务中。试试这个:

<tomee>
<Resource id="MyJmsResourceAdapter" type="ActiveMQResourceAdapter">
# Do not start the embedded ActiveMQ broker
BrokerXmlConfig =
ServerUrl = tcp://externalhost:61616?jms.redeliveryPolicy.maximumRedeliveries=0
</Resource>

<Resource id="jms/MyIncomingConnFactory" type="javax.jms.ConnectionFactory">
resourceAdapter = MyJmsResourceAdapter
transactionSupport = xa
</Resource>

<Resource id="jms/MyOutgoingConnFactory" type="javax.jms.ConnectionFactory">
resourceAdapter = MyJmsResourceAdapter
transactionSupport = xa
</Resource>

<Resource id="jms/MyOutgoingQueue" type="javax.jms.Queue"/>
<Resource id="jms/MyIncomingQueue" type="javax.jms.Queue"/>

<Resource id="jdbc/myDBXAPooled" type="DataSource">
XaDataSource myDBXA
DataSourceCreator dbcp
JtaManaged true
UserName TestUser
Password TestPassword
MaxWait 2000
ValidationQuery SELECT 1
MaxActive 15
</Resource>

<Resource id="myDBXA" type="XADataSource" class-name="com.mysql.jdbc.jdbc2.optional.MysqlXADataSource">
Url jdbc:mysql://localhost:3306/test
User TestUser
Password TestPassword
</Resource>
</tomee>

关于activemq - Apache TomEE 外部 ActiveMQ 资源未在分布式事务中回滚,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60627955/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com