- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
我正在使用 Spring Integration 4.1.5 并尝试对事务执行某些操作,但不幸的是我无法也找不到工作示例。我正在尝试设置一个正在查找消息的 JMS 轮询器。一旦收到消息,服务激活器将在数据库中插入一行,并将该消息传递给另一个服务激活器。我想让前两部分成为事务性的消息拾取和数据库插入。我不希望其余流程是事务性的。我使用 Weblogic 作为应用程序容器,因此将使用 WebLogicJtaTransactionManager。
我遇到的问题是我无法使前两部分成为事务性的。要么全有,要么什么都没有。我尝试了很多方法,但我觉得在轮询器上使用建议链是最好的选择。我将能够控制哪些方法将成为事务的一部分。
我见过使用消息驱动监听器的示例,但我正在使用 Weblogic 并且将使用工作管理器,并且我相信我必须使用轮询器才能利用工作管理器(如果情况并非如此,我想这将是 future 的另一个问题!)
我已经获取了 xml 并对其进行了简化,但是除了编辑包名称之外,上下文还会产生问题。
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"
xmlns:file="http://www.springframework.org/schema/integration/file"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
xmlns:int-jms="http://www.springframework.org/schema/integration/jms"
xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/sftp
http://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/integration/xml
http://www.springframework.org/schema/integration/xml/spring-integration-xml.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd">
<bean id="jtaTransactionManager"
class="org.springframework.transaction.jta.WebLogicJtaTransactionManager">
<property name="transactionManagerName" value="javax.transaction.TransactionManager" />
</bean>
<bean id="insertMessageToDb" class="com.ReadMsgFromAxway" />
<bean id="serviceActivator" class="com.CreateTMDFile" />
<int-jms:inbound-channel-adapter id="jmsDefaultReceiver"
connection-factory="inboundDefaultAdaptorConnectionFactory"
extract-payload="false" destination="inboundAdaptorDefaultListenerQueue"
channel="inboundJMS" acknowledge="transacted">
<int:poller id="poller"
max-messages-per-poll="100" fixed-rate="10">
<int:advice-chain>
<ref bean="txAdvice" />
</int:advice-chain>
</int:poller>
</int-jms:inbound-channel-adapter>
<tx:advice id="txAdvice" transaction-manager="jtaTransactionManager">
<tx:attributes>
<tx:method name="processMessage" propagation="REQUIRED"/>
</tx:attributes>
</tx:advice>
<aop:config>
<aop:pointcut id="txOperation"
expression="execution(* axway.ReadMsgFromAxway.processMessage(..))"/>
<aop:advisor advice-ref="txAdvice" pointcut-ref="txOperation" />
</aop:config>
<int:service-activator input-channel="inboundJMS"
output-channel="serviceActivatorChannel" ref="insertMessageToDb" method="processMessage" />
<int:chain input-channel="serviceActivatorChannel" output-channel="nullChannel">
<int:service-activator ref="serviceActivator" />
</int:chain>
</beans>
ReadMsgFromAxway.java
public Message<File> processMessage(Message<?> message) {
//Insert into DB
trackerProcess.insertUpdateMessageTracker(message, "axwayChannel",
"axwayChannel", currentStatusID, null, null);
count++;
int mod = count % 2;
if (mod != 0) {
// pass every 2
String hello = "hey";
} else {
throw new RuntimeException("Testing transactional");
}
Message<File> springMessage = MessageBuilder.createMessage(payloadFile,
messageHeaders);
return springMessage;
}
无论是抛出运行时异常,还是在下一个服务激活器组件处抛出异常,XML 本身都不执行任何操作。
如果我将建议属性更改为
<tx:method name="*" propagation="REQUIRED"/>
然后第一个和第二个服务激活器处的异常导致回滚。
奇怪的是,如果我这样做
<tx:method name="processMessage" propagation="REQUIRED"/>
<tx:method name="*" propagation="NEVER"/>
然后,无论是第一个服务激活器还是第二个服务激活器中抛出运行时异常,消息都会回滚。我认为切入点会限制哪个类会导致事务,但我可能误解了一些东西。
另一个注意事项 - 此应用程序与其他几个 war 一起存放在一个 Ear 文件中。该上下文启动整个入站流程,并通过 JMS 队列连接到另一个包含业务逻辑的 war。在使用 method name="*"的场景中,我看到业务逻辑 war 中的异常导致原始入站消息的 JMS 消息也被回滚。我的印象是第二次 war 将在另一个线程中进行处理,因为它通过队列接收消息,因此不属于事务的一部分。这可能是容器管理的 JTA 的副作用吗?
谢谢!
最佳答案
我建议您阅读 Dave Syer 的 article关于交易,您可以在“更多类似内容”中找到链接。
现在看起来你根本不了解事务和AOP。你应该多关注Spring Framework中的AOP支持.
一般来说,声明性事务(方法上的 @Transactional
)是 AOP 建议的特殊情况。任何 AOP 背后的主要概念是由我们指定建议的方法调用产生的调用堆栈边界。
但是如果目标对象中没有这样的方法,则不会将其建议并包装到 AOP 代理。就像您的 processMessage
的情况一样当您应用txAdvice
时对于 org.springframework.messaging.Message.MessageSource
周围的一些内部对象作为 JmsDestinationPollingSource
的契约(Contract)为此<int-jms:inbound-channel-adapter>
.
您可以使用 <transactional>
<poller>
的配置。右图:所有下游流量都将被交易覆盖。
在这种情况下完成该内部的交易
Callable<Boolean> pollingTask = new Callable<Boolean>() {
@Override
public Boolean call() throws Exception {
return doPoll();
}
};
大约receive()
和handleMessage()
,您应该像常规 @Transactional
一样完成方法调用。案件。为此,我们应该将下一个消息处理交给不同的线程。只是因为默认情况下所有 <channel>
是 DirectChannel
并与当前调用堆栈绑定(bind)。
为此,您可以使用 ExecutorChannel
( <int:dispatcher task-executor="threadPoolExecutor"/>
)为您的 serviceActivatorChannel
.
从那里您不需要其余的 AOP 配置。
不确定您针对其他网络应用程序的第二个问题。看起来它有一些逻辑可以在你这边出现一些不一致的情况下回滚它的工作。但无论如何,这看起来像是一个不同的问题。
关于java - JMS 轮询器事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37017917/
我正在读这个 question和 corresponding answer并被答案第一行中的术语 JMS broker 弄糊涂了: MS (ActiveMQ is a JMS broker imple
我正在学习 API 中的 Reactive Streams,我对它与 JMS 之间的相似性感到震惊。在 JMS 中,我们也有异步处理、发布者和订阅者。我在进行这种等效时缺少什么观点? 最佳答案 Rea
假设生产者向 JMS 主题“新闻”发送一条消息。消费者 1 读取了消息,但消费者 2 处于离线状态,因此他还没有读取消息。 是否有任何内置(针对规范或实现)的方式来通知生产者消费者 1 已阅读他的消息
目前我正在开发一个 JMS 应用程序。但我使用普通的 JMS API 和属性文件进行配置。我的应用程序在 Weblogic 中运行并连接到我客户端的 MQ 系列服务器。 最近我知道我可以使用 Webl
我正在尝试使用 Solace 中可用的异步发送功能,但我打算使用 JMS 进行抽象,而不是直接使用 JCSMP 使用它。 JMS 2.0 支持异步发送以及其他新功能:http://www.oracle
我无法获得 javax.jms.ConnectionFactory注入(inject)我的独立 JMS 客户端。 我得到一个 java.lang.NullPointerException在 conne
保持 JMS 连接/ session /消费者始终打开是一种不好的做法吗? 代码草稿示例: // app startup code ConnectionFactory cf = (Connection
我有几个作业,每个作业都有多条消息排队。每个作业的消息随机交错。如果用户决定取消作业,我想从队列中删除属于该作业的所有消息。我已经能够使用 browse() 找到所有要删除的消息,但一直无法弄清楚如何
是否可以将主题配置为仅存储最后一条消息的副本并将其发送到新连接而不知道客户端标识符或其他信息? 更新: 从 Shashi 提供的信息中,我发现这两页使用 retroactive consumer 描述
目前正在使用 WebLogic 和分布式队列。我从文档中了解到,分布式队列允许您使用全局 JNDI 名称检索到集群中任何队列的连接。分布式队列为您提供的主要功能之一似乎是跨多个托管服务器的负载平衡连接
再见,我的基本要求是有一个可以发送消息的路由,并将其放在 JMS 队列中。 camel 上下文在 JavaEE 6 容器中运行,即 JBoss AS 7.1.1,因此它是 HornetQ for JM
我正在阅读 JMS 2.0 规范,其中提到(相关摘录下方)如果客户端尝试修改 Message 对象,则 JMS 提供程序可能会抛出异常。 我的问题是 JMS 提供者如何知道客户端是否试图修改 Mess
我的 spring 上下文文件中有以下设置。 "PowerEventQueue" “${
我正在尝试使用 JSP 连接到 ActiveMQ。但是,当我运行该程序时,它给了我以下类型的异常: NoClassDefFoundError: javax/jms/Destination . 我不确定
我刚看了CORBA和JMS,他们好像都是用来实现的代理架构/模式。 我对他们有几个问题 1.他们之间的区别我还不是很清楚,谁能解释一下? 2.CORBA 是否用于当今的 IT 解决方案?还是正在失去魅
我正在更新现有的 Mule 配置,任务是增强它以根据消息的某些属性将消息路由到不同的端点,因此最好对我手头的两个选项有一些利弊: 在消息上添加属性,使用“message-properties-tran
我有一个订阅 JMS 主题应用程序的 Java 应用程序,该应用程序偶尔会出现以下异常: javax.jms.JMSException: Connection has been terminated
我知道 Camel 的 JMS 组件用于接收消息,使用 Springs DefaultMessageListenerContainer。它可以配置为使用 CLIENT_ACKNOWLEDGE 模式来确
通常不鼓励使用从 JMS 提供者返回的消息 ID 作为相关 ID,将消息发布到队列中。人们如何为请求/响应架构生成相关 ID? 最佳答案 客户端可以使用唯一的 ID 标准,如 UUID生成新的 ID。
我有一个简单的代码可以将 2 条消息放入队列中。 1) 我用两台服务器设置了 connectionNameList。 2) 这两个服务器是独立的,但有相同的队列管理器和定义相同名称的队列,例如“QMg
我是一名优秀的程序员,十分优秀!