gpt4 book ai didi

java - Camel/ActiveMQ 事务、重新交付和 DLQ

转载 作者:行者123 更新时间:2023-11-30 11:17:33 25 4
gpt4 key购买 nike

使用 Fabric8 379 构建。

目前正在努力使用 ActiveMQ 和 Camel 让 TransactionErrorHandler 的预期行为按预期工作。

首先根据 Camel 错误处理程序文档(http://camel.apache.org/error-handler.html),如果我按照建议调用 TransactionErrorHandler,即

<errorHandler id="txEH" type="TransactionErrorHandler">
<redeliveryPolicy logStackTrace="false" logExhausted="false" maximumRedeliveries="3"/>
</errorHandler>

我得到一个错误:

Caused by: org.xml.sax.SAXParseException: cvc-enumeration-valid: Value 'TransactionErrorHandler' is not facet-valid with respect to enumeration '[DeadLetterChannel, DefaultErrorHandler, NoErrorHandler, LoggingErrorHandler]'. It must be a value from the enumeration.

这很公平,我想 TransactionErrorHandler 已从模式中删除并且必须以不同方式调用?因此,如果我采用替代路线并像这样指定一个 TransactionErrorHandler bean:

<bean id="transactionErrorHandler"
class="org.apache.camel.spring.spi.TransactionErrorHandlerBuilder">
<property name="deadLetterUri" value="activemq:queue:ActiveMQ.DLQ" />
<property name="redeliveryPolicy" ref="redeliveryPolicy" />
</bean>

<bean id="redeliveryPolicy" class="org.apache.camel.processor.RedeliveryPolicy">
<property name="backOffMultiplier" value="2" />
<property name="maximumRedeliveries" value="2" />
<property name="redeliveryDelay" value="1000" />
<property name="useExponentialBackOff" value="true" />
</bean>

我可以通过指定 errorHandlerRef="transactionErrorHandler"在我的 route 成功使用它。但是,在测试此场景时,将完全忽略重新传递策略,重新传递尝试次数为 6 次(默认),而不是上面指定的 2 次。我希望有人能为我指出正确的方向,说明如何在路由中正确指定 TransactionErrorHandler。下面是我当前的测试 blueprint.xml,它被部署到一个结构上:

<blueprint xmlns="http://www.osgi.org/xmlns/blueprint/v1.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:cm="http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0"
xmlns:camel="http://camel.apache.org/schema/blueprint"
xsi:schemaLocation="http://www.osgi.org/xmlns/blueprint/v1.0.0 http://www.osgi.org/xmlns/blueprint/v1.0.0/blueprint.xsd
http://camel.apache.org/schema/blueprint http://camel.apache.org/schema/blueprint/camel-blueprint.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
http://aries.apache.org/blueprint/xmlns/blueprint-cm/v1.0.0 http://aries.apache.org/schemas/blueprint-cm/blueprint-cm-1.0.0.xsd">

<!-- blueprint property placeholders -->
<cm:property-placeholder id="test-adapter" persistent-id="uk.test.transactions">
<cm:default-properties>
<cm:property name="amqBrokerURL" value="discovery:(fabric:platform)" />
<cm:property name="amqBrokerUserName" value="admin" />
<cm:property name="amqBrokerPassword" value="admin" />
</cm:default-properties>
</cm:property-placeholder>

<camelContext xmlns="http://camel.apache.org/schema/blueprint" id="TestRouteContext" useMDCLogging="true">

<!-- <errorHandler id="txEH" type="TransactionErrorHandler">
<redeliveryPolicy logStackTrace="false"
logExhausted="false" />
</errorHandler> -->

<route id="platform-test-route" errorHandlerRef="txEH">
<from uri="activemq:queue:test-queue-in" />
<transacted ref="transactionPolicy" />
<!-- Basic Bean that logs a message -->
<bean ref="stubSuccess" />
<!-- Basic Bean that throws a java.lang.Exception-->
<bean ref="stubFailure" />
<to uri="activemq:queue:test-queue-out" />
</route>

</camelContext>

<bean id="stubSuccess" class="uk.test.transactions.stubs.StubSuccess" />

<bean id="stubFailure" class="uk.test.transactions.stubs.StubFailure" />

<bean id="transactionErrorHandler"
class="org.apache.camel.spring.spi.TransactionErrorHandlerBuilder">
<property name="deadLetterUri" value="activemq:queue:ActiveMQ.DLQ" />
<property name="redeliveryPolicy" ref="redeliveryPolicy" />
</bean>

<bean id="transactionPolicy" class="org.apache.camel.spring.spi.SpringTransactionPolicy">
<property name="transactionManager" ref="jmsTransactionManager" />
<property name="propagationBehaviorName" value="PROPAGATION_REQUIRED" />
</bean>

<bean id="jmsTransactionManager"
class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsPooledConnectionFactory" />
</bean>

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="connectionFactory" ref="jmsPooledConnectionFactory" />
<property name="transacted" value="true" />
<property name="transactionManager" ref="jmsTransactionManager" />
<property name="cacheLevelName" value="CACHE_CONSUMER" />
</bean>

<bean id="jmsPooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactory"
init-method="start" destroy-method="stop">
<property name="maxConnections" value="1" />
<property name="connectionFactory" ref="jmsConnectionFactory" />
</bean>

<!-- <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<property name="backOffMultiplier" value="2" />
<property name="initialRedeliveryDelay" value="2000" />
<property name="maximumRedeliveries" value="2" />
<property name="redeliveryDelay" value="1000" />
<property name="useExponentialBackOff" value="true" />
</bean> -->

<bean id="redeliveryPolicy" class="org.apache.camel.processor.RedeliveryPolicy">
<property name="backOffMultiplier" value="2" />
<property name="maximumRedeliveries" value="2" />
<property name="redeliveryDelay" value="1000" />
<property name="useExponentialBackOff" value="true" />
</bean>

<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="${amqBrokerURL}" />
<property name="userName" value="${amqBrokerUserName}" />
<property name="password" value="${amqBrokerPassword}" />
<property name="watchTopicAdvisories" value="false" />
<!-- <property name="redeliveryPolicy" ref="redeliveryPolicy" /> -->
</bean>

</blueprint>

如果有人能看出我哪里出错了,将不胜感激。

最佳答案

当您使用 TX 时,您应该在 AMQ 经纪人上配置重新交付选项,它是负责重新交付的经纪人(而不是 Camel)。

关于java - Camel/ActiveMQ 事务、重新交付和 DLQ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/24303048/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com