gpt4 book ai didi

spring - 使用 Spring、JOOQ、Postgres 和 ActiveMQ 时无回滚

转载 作者:行者123 更新时间:2023-11-29 13:26:35 24 4
gpt4 key购买 nike

数据库写入没有像我预期的那样回滚。我花了很多时间阅读软件文档和网络帖子。我无法解决这个问题。

我希望你们能帮助我。

场景

  • 我的应用程序从队列中提取消息,从
    消息,并将其写入数据库。
  • 写入数据库的方法执行 2 个 SQL 插入。
  • 第二个插入出现异常:org.postgresql.util.PSQLException: ERROR: duplicate key value violates unique constraint "table2_PK"
  • 但是,第一个插入仍在提交到数据库。

相关软件

  1. spring-boot 1.2.5.RELEASE
  2. atomikos-util 3.9.3(来自 spring-boot-starter-jta-atomikos 1.2.5.RELEASE)
  3. jooq 3.6.2
  4. postgresql 9.4-1201-jdbc41
  5. activemq-client 5.1.2

应用程序代码 - 我在下面粘贴了我的代码的相关部分。

  1. GdmServer - 我的“服务器”类,它还声明了 Spring bean配置
  2. PortSIQueue - 我的 JMS MessageListener 类
  3. 内核 - 我的工作类,即写入数据库的代码,一个由我的 MessageListener 调用的 Spring bean

如果有人能提供任何帮助,我将不胜感激。

谢谢


package com.sm.gis.gdm;

import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DSL;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;

@SpringBootApplication
@EnableJms
@EnableTransactionManagement
public class GdmServer {

@Autowired
ConfigurableApplicationContext context;
@Autowired
GisConfig gisConfig;

/**
* Starts the GDM Server
*/
public static void main(String[] args) {
SpringApplication.run(GdmServer.class, args);
}

// -------------------------------------------------------------------------
// Spring bean configurations
// -------------------------------------------------------------------------

@Bean
GisConfig gisConfig() {
return new GisConfig();
}

@Bean
PlatformTransactionManager transactionManager() throws SystemException {
JtaTransactionManager manager = new JtaTransactionManager();
manager.setTransactionManager( atomikosUserTransactionManager() );
manager.setUserTransaction ( atomikosUserTransaction() );
manager.setAllowCustomIsolationLevels(true);
return manager;
}

@Bean(initMethod = "init", destroyMethod = "close")
UserTransactionManager atomikosUserTransactionManager() throws SystemException {
UserTransactionManager manager = new UserTransactionManager();
manager.setStartupTransactionService(true);
manager.setForceShutdown(false);
manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
return manager;
}

@Bean
UserTransaction atomikosUserTransaction() {
return new UserTransactionImp();
}

@Bean(initMethod = "init", destroyMethod = "close")
AtomikosDataSourceBean atomikosJdbcConnectionFactory() {
PGXADataSource pgXADataSource = new PGXADataSource();
pgXADataSource.setUrl( gisConfig.getGdbUrl() );
pgXADataSource.setUser( gisConfig.getGdbUser() );
pgXADataSource.setPassword( gisConfig.getGdbPassword() );

AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(pgXADataSource);
xaDataSource.setUniqueResourceName("gdb");
xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
return xaDataSource;
}

@Bean
DSLContext dslContext() {
DSLContext dslContext = DSL.using(atomikosJdbcConnectionFactory(), SQLDialect.POSTGRES);
return dslContext;
}

@Bean(initMethod = "init", destroyMethod = "close")
AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
activeMQXAConnectionFactory.setBrokerURL( gisConfig.getMomBrokerUrl() );

AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
return atomikosConnectionFactoryBean;
}

@Bean
DefaultMessageListenerContainer queueWrapperGDM() throws SystemException {
DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer();
messageSource.setTransactionManager( transactionManager() );
messageSource.setConnectionFactory( atomikosJmsConnectionFactory() );
messageSource.setSessionTransacted(true);
messageSource.setConcurrentConsumers(1);
messageSource.setReceiveTimeout( gisConfig.getMomQueueGdmTimeoutReceive() );
messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
messageSource.setMessageListener( context.getBean("portSIQueue") );
return messageSource;
}

@Bean
JmsTemplate queueWrapperLIMS() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
jmsTemplate.setDefaultDestinationName( gisConfig.getMomQueueLimsName() );
jmsTemplate.setSessionTransacted(true);
return jmsTemplate;
}

}

package com.sm.gis.gdm.ports;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;

@Component
public class PortSIQueue implements MessageListener {

@Autowired
ConfigurableApplicationContext context;
@Autowired
GisMessageMarshaler queueMessageMashaler;
@Autowired
Kernel kernel;

@Override
@Transactional(rollbackFor = {Throwable.class})
public void onMessage(Message jmsMessage) {

TextMessage jmsTextMessage = (TextMessage) jmsMessage;

// Extract JMS message body...
String jmsPayload = "";
try {
jmsPayload = jmsTextMessage.getText();
} catch (JMSException e) {
throw new RuntimeException(e);
}

// Marshal XML text to object...
Object gisMessage = queueMessageMashaler.toObject(jmsPayload);

kernel.receiveCreateGenomicTestOrderInGIS( (CreateGenomicTestOrderInGIS) gisMessage );
}

}

package com.sm.gis.gdm.kernel;

import org.jooq.DSLContext;
import org.jooq.impl.DSL;

@Component
public class Kernel {

@Autowired
ConfigurableApplicationContext context;
@Autowired
DSLContext dslContext;

<snip>
public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {

dslContext.insertInto(table1)
.set(...)
.set(...)
.execute();

dslContext.insertInto(table2)
.set(...)
.set(...)
.execute();
}
<snip>
}

最佳答案

我是个傻子。原来这个问题是由于我的应用程序逻辑中的缺陷造成的。如果第一次尝试处理消息失败并出现异常,ActiveMQ 组件将重试该消息。为第一次尝试创建的事务正确回滚。这是第二次成功的尝试。重试成功是因为在第一次尝试期间应用程序逻辑增加了数据库序列号,而第二次尝试没有导致重复键冲突。纠正应用程序逻辑缺陷后,由于在我的应用程序中没有任何消息是可重试的,所以我也关闭了重试。对于浪费阅读我帖子的人的时间,我深表歉意。

在此过程中,我确实对实现进行了一些更改。这些更改使某些默认值成为明确的选择。我保留了这些更改,因为我相信它们将使我团队中的其他开发人员更容易更快地了解正在发生的事情。我还保留了 JOOQ 异常转换代码,因为在其他情况下会需要它,而且这似乎是最佳实践。

我已将修改后的代码包含在这篇文章中,以防其他人发现它有用。


package com.sm.gis.gdm;

import javax.transaction.SystemException;
import javax.transaction.UserTransaction;

import org.apache.activemq.ActiveMQXAConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.jooq.impl.DefaultConfiguration;
import org.jooq.impl.DefaultDSLContext;
import org.jooq.impl.DefaultExecuteListenerProvider;
import org.postgresql.xa.PGXADataSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.DependsOn;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.listener.DefaultMessageListenerContainer;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;
import org.springframework.transaction.jta.JtaTransactionManager;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.icatch.jta.UserTransactionManager;
import com.atomikos.jms.AtomikosConnectionFactoryBean;
import com.sm.gis.config.GisConfig;

@SpringBootApplication
@EnableJms
@EnableTransactionManagement(proxyTargetClass=true)
public class GdmServer {

@Autowired
ConfigurableApplicationContext context;
@Autowired
GisConfig gisConfig;

/**
* Starts the GDM Server
*/
public static void main(String[] args) {
SpringApplication.run(GdmServer.class, args);
}

// -------------------------------------------------------------------------
// Spring bean configurations
// -------------------------------------------------------------------------

@Bean
GisConfig gisConfig() {
return new GisConfig();
}

@Bean
@DependsOn({ "atomikosUserTransactionManager", "atomikosUserTransaction", "atomikosJdbcConnectionFactory", "atomikosJmsConnectionFactory" })
PlatformTransactionManager transactionManager() throws SystemException {
JtaTransactionManager manager = new JtaTransactionManager();
manager.setTransactionManager( atomikosUserTransactionManager() );
manager.setUserTransaction( atomikosUserTransaction() );
manager.setAllowCustomIsolationLevels(true);
manager.afterPropertiesSet();
return manager;
}

@Bean(initMethod = "init", destroyMethod = "close")
UserTransactionManager atomikosUserTransactionManager() throws SystemException {
UserTransactionManager manager = new UserTransactionManager();
manager.setStartupTransactionService(true);
manager.setForceShutdown(false);
manager.setTransactionTimeout( gisConfig.getTxnTimeout() );
return manager;
}

@Bean
UserTransaction atomikosUserTransaction() {
return new UserTransactionImp();
}

@Bean(initMethod = "init", destroyMethod = "close")
AtomikosDataSourceBean atomikosJdbcConnectionFactory() throws Exception {
PGXADataSource pgXADataSource = new PGXADataSource();
pgXADataSource.setUrl( gisConfig.getGdbUrl() );
pgXADataSource.setUser( gisConfig.getGdbUser() );
pgXADataSource.setPassword( gisConfig.getGdbPassword() );

AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(pgXADataSource);
xaDataSource.setUniqueResourceName("gdb");
xaDataSource.setPoolSize( gisConfig.getGdbPoolSize() );
xaDataSource.setTestQuery("SELECT 1");
xaDataSource.afterPropertiesSet();
return xaDataSource;
}

@Bean
@DependsOn({ "atomikosJdbcConnectionFactory" })
DSLContext dslContext() throws Exception {
DefaultConfiguration jooqConfiguration = new DefaultConfiguration();
jooqConfiguration.set( SQLDialect.POSTGRES_9_4 );
jooqConfiguration.set( atomikosJdbcConnectionFactory() );
jooqConfiguration.set( new DefaultExecuteListenerProvider(new JooqToSpringExceptionTransformer()) );
DSLContext dslContext = new DefaultDSLContext(jooqConfiguration);
return dslContext;
}


@Bean(initMethod = "init", destroyMethod = "close")
AtomikosConnectionFactoryBean atomikosJmsConnectionFactory() {
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setInitialRedeliveryDelay(0);
redeliveryPolicy.setRedeliveryDelay(0);
redeliveryPolicy.setUseExponentialBackOff(false);
redeliveryPolicy.setMaximumRedeliveries(0);

ActiveMQXAConnectionFactory activeMQXAConnectionFactory = new ActiveMQXAConnectionFactory();
activeMQXAConnectionFactory.setBrokerURL( gisConfig.getMomBrokerUrl() );
activeMQXAConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);

AtomikosConnectionFactoryBean atomikosConnectionFactoryBean = new AtomikosConnectionFactoryBean();
atomikosConnectionFactoryBean.setUniqueResourceName("activeMQBroker");
atomikosConnectionFactoryBean.setXaConnectionFactory(activeMQXAConnectionFactory);
atomikosConnectionFactoryBean.setLocalTransactionMode(false);
return atomikosConnectionFactoryBean;
}

@Bean
@DependsOn({ "transactionManager" })
DefaultMessageListenerContainer queueWrapperGDM() throws SystemException {
DefaultMessageListenerContainer messageSource = new DefaultMessageListenerContainer();
messageSource.setTransactionManager( transactionManager() );
messageSource.setConnectionFactory( atomikosJmsConnectionFactory() );
messageSource.setSessionTransacted(true);
messageSource.setSessionAcknowledgeMode(0);
messageSource.setConcurrentConsumers(1);
messageSource.setReceiveTimeout( gisConfig.getMomQueueGdmTimeoutReceive() );
messageSource.setDestinationName( gisConfig.getMomQueueGdmName() );
messageSource.setMessageListener( context.getBean("portSIQueue") );
messageSource.afterPropertiesSet();
return messageSource;
}

@Bean
@DependsOn({ "transactionManager" })
JmsTemplate queueWrapperLIMS() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory( atomikosJmsConnectionFactory() );
jmsTemplate.setDefaultDestinationName( gisConfig.getMomQueueLimsName() );
jmsTemplate.setSessionTransacted(true);
jmsTemplate.setSessionAcknowledgeMode(0);
return jmsTemplate;
}

}

package com.sm.gis.gdm.ports;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.TextMessage;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Propagation;
import org.springframework.transaction.annotation.Transactional;

import com.sm.gis.gdm.kernel.Kernel;
import com.sm.gis.sdo.xml.marshaler.GisMessageMarshaler;
import com.sm.gis.sdo.xml.service.message.CreateGenomicTestOrderInGIS;

@Component
public class PortSIQueue implements MessageListener {

@Autowired
ConfigurableApplicationContext context;
@Autowired
GisMessageMarshaler queueMessageMashaler;
@Autowired
Kernel kernel;

@Override
@Transactional(propagation = Propagation.REQUIRES_NEW, rollbackFor = {Throwable.class})
public void onMessage(Message jmsMessage) {

TextMessage jmsTextMessage = (TextMessage) jmsMessage;

// Extract JMS message body...
String jmsPayload = "";
try {
jmsPayload = jmsTextMessage.getText();
} catch (JMSException e) {
throw new RuntimeException(e);
}

// Marshal XML text to object...
Object gisMessage = queueMessageMashaler.toObject(jmsPayload);

kernel.receiveCreateGenomicTestOrderInGIS( (CreateGenomicTestOrderInGIS) gisMessage );

}

package com.sm.gis.gdm.kernel;

import org.jooq.DSLContext;

@Component
public class Kernel {

@Autowired
ConfigurableApplicationContext context;
@Autowired
DSLContext dslContext;

<snip>
public void receiveCreateGenomicTestOrderInGIS(CreateGenomicTestOrderInGIS message) {

dslContext.insertInto(table1)
.set(...)
.set(...)
.execute();

dslContext.insertInto(table2)
.set(...)
.set(...)
.execute();
}
<snip>
}

关于spring - 使用 Spring、JOOQ、Postgres 和 ActiveMQ 时无回滚,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/32797078/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com