gpt4 book ai didi

jms - 如何为 CXF 重新发布失败的 QPid 消息进行 JMS 传输?或者,我是否缺少更好的解决方案?

转载 作者:行者123 更新时间:2023-12-05 01:20:49 25 4
gpt4 key购买 nike

我正在为我的公司编写电子邮件网络服务。主要要求之一是保证交付,因此我们在 JMS 传输上有一个薄的 HTTP 层,使用持久的 QPid 队列。

我遇到的问题之一是处理过程中的错误处理。如果我在出现错误时简单地回滚事务,消息就会到达队列的头部。如果错误足够普遍,这可能会锁定整个队列,直到有人手动干预,我们希望通过回发到头部来避免这种情况,以便同时处理消息。

然而,这就是我的问题。首先,虽然 AMQP 有一种机制以原子方式“拒绝并重新排队”消息而不是确认它,但 JMS 似乎没有任何类似的功能,所以访问它的唯一方法是通过强制转换,这将我与具体的 JMS 实现。此外,CXF 的 JMS 传输似乎没有任何方法可以在传输级别覆盖或注入(inject)行为,这意味着我要么编写字节码代理,要么更改代码并重新编译以获得我想要的行为。

为了解决这个问题,我考虑过在 CXF 中实现一个错误处理程序的想法,它只是从 CXF 消息中重建 JMS 消息,然后重新排队。但是后来我不能使用事务处理 session ,因为故障导致我无法覆盖的回滚,然后我将在头部(来自回滚)和尾部(从重新排队)。而且我不能使用 CLIENT_ACKNOWLEDGE,因为 JMS 传输会在 提交消息进行处理之前确认消息,这意味着如果服务器在错误的时间停机,我可能会丢失一条消息。

所以基本上,只要我坚持接受 JMS 传输的默认行为,似乎就不可能在不损害数据完整性的情况下获得我想要的行为(失败消息的重新排队)。

一位同事建议完全避开 JMS 传输,并直接调用队列。服务实现将是一个骨架类,它的存在只是为了将消息放入队列,而另一个进程将实现一个消息监听器。对我来说,这个解决方案不是最优的,因为我失去了不可知 Web 服务的优雅,并且通过将我的实现与底层技术耦合而失去了一些可扩展性。

我还考虑过使用 RabbitMQ 客户端库为 AMQP 编写 CXF 传输。这将需要更长的时间,但它是我们公司可以继续使用的东西,也许可以回馈给 CXF 项目。也就是说,我对这个想法并不狂热,因为编写、运行和测试代码需要花费大量时间。

这是我的 CXF beans.xml:

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:jaxrs="http://cxf.apache.org/jaxrs"
xmlns:jms="http://cxf.apache.org/transports/jms"
xmlns:p="http://www.springframework.org/schema/p"
xsi:schemaLocation="
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://cxf.apache.org/jaxrs http://cxf.apache.org/schemas/jaxrs.xsd
http://cxf.apache.org/transports/jms http://cxf.apache.org/schemas/configuration/jms.xsd">

<import resource="classpath:META-INF/cxf/cxf.xml" />
<import resource="classpath:META-INF/cxf/cxf-extension-soap.xml" />
<import resource="classpath:META-INF/cxf/cxf-servlet.xml" />

<context:component-scan base-package="com.edo" />

<bean id="jmsConnectionFactory" class="org.apache.qpid.client.AMQConnectionFactory">
<constructor-arg name="broker" value="tcp://localhost:5672"/>
<constructor-arg name="username" value="guest"/>
<constructor-arg name="password" value="guest"/>
<constructor-arg name="clientName" value=""/>
<constructor-arg name="virtualHost" value=""/>
</bean>

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" p:explicitQosEnabled="true" p:deliveryMode="1" p:timeToLive="5000" p:connectionFactory-ref="jmsConnectionFactory" p:sessionTransacted="false" p:sessionAcknowledgeModeName="CLIENT_ACKNOWLEDGE" />
<bean id="jmsConfig" class="org.apache.cxf.transport.jms.JMSConfiguration" p:connectionFactory-ref="jmsConnectionFactory" p:wrapInSingleConnectionFactory="false" p:jmsTemplate-ref="jmsTemplate" p:timeToLive="500000" p:sessionTransacted="false" p:concurrentConsumers="1" p:maxSuspendedContinuations="0" p:maxConcurrentConsumers="1" />

<jms:destination id="jms-destination-bean" name="{http://test.jms.jaxrs.edo.com/}HelloWorldImpl.jms-destination">
<jms:address jndiConnectionFactoryName="ConnectionFactory" jmsDestinationName="TestQueue">
<jms:JMSNamingProperty name="java.naming.factory.initial" value="org.apache.activemq.jndi.ActiveMQInitialContextFactory"/>
<jms:JMSNamingProperty name="java.naming.provider.url" value="tcp://localhost:5672"/>
</jms:address>
<jms:jmsConfig-ref>jmsConfig</jms:jmsConfig-ref>
</jms:destination>

<jaxrs:server id="helloWorld" address="/HelloWorld" transportId="http://cxf.apache.org/transports/jms">
<jaxrs:serviceBeans>
<ref bean="helloWorldBean"/>
</jaxrs:serviceBeans>
<jaxrs:inInterceptors>
<bean class="com.edo.jaxrs.jms.test.FlowControlInInterceptor" p:periodMs="1000" p:permitsPerPeriod="18" />
</jaxrs:inInterceptors>
<jaxrs:providers>
<bean class="org.apache.cxf.jaxrs.provider.JSONProvider">
<property name="produceMediaTypes" ref="jsonTypes"/>
<property name="consumeMediaTypes" ref="jsonTypes"/>
</bean>
</jaxrs:providers>
</jaxrs:server>

<bean id="http-jms-config" class="com.edo.jaxrs.jms.test.HttpOverJmsConfig"
p:jmsFactory-ref="jmsConnectionFactory"
p:jmsDestinationName="TestQueue" />

<util:list id="jsonTypes">
<value>application/json</value>
<value>application/jettison</value>
</util:list>

</beans>

我缺少什么简单的东西吗?或者是否有更好的方法来解决这个问题?

最佳答案

所以 - 我采纳了我同事的建议,不对 Web 服务使用 JMS 传输。相反,我们将在 Spring Integration 上创建一个薄的 Web 服务层。这应该允许我们进行所需的控制粒度,而不会不必要地暴露消息传递层。

关于jms - 如何为 CXF 重新发布失败的 QPid 消息进行 JMS 传输?或者,我是否缺少更好的解决方案?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8424522/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com