gpt4 book ai didi

activemq - 我的过期 JMS 消息在哪里?

转载 作者:行者123 更新时间:2023-12-04 17:36:38 25 4
gpt4 key购买 nike

我使用的是 ActiveMQ 5.8.0 和 Camel 2.10.4。我正在从 JMS 队列读取 ExchangePattern.InOnly 消息,并希望将在给定时间内未处理的消息显式过期到命名的死信队列。

我有以下路线:

public class FulfillmentRequestRoute extends RouteBuilder {

@Override
public void configure() throws Exception {

errorHandler(deadLetterChannel("jms:queue:dead").useOriginalMessage());
from("jms:queue:fulfillmentRequest?explicitQosEnabled=true&timeToLive=10000&transacted=true")
.transacted()
.to("mock:initialProcessor");
}
}

以及以下 ActiveMQ 配置:
<!-- Configure the ActiveMQ JMS broker server to listen on TCP port 61610 -->
<broker:broker useJmx="true" persistent="true" brokerName="myBroker">
<broker:transportConnectors>
<!-- expose a VM transport for in-JVM transport between AMQ and Camel on the server side -->
<broker:transportConnector name="vm" uri="vm://myBroker" />
<!-- expose a TCP transport for clients to use -->
<broker:transportConnector name="tcp" uri="tcp://localhost:${tcp.port}" />
</broker:transportConnectors>
<broker:persistenceAdapter>
<broker:kahaPersistenceAdapter directory="target/olp-activemq-data" maxDataFileLength="33554432"/>
</broker:persistenceAdapter>
<broker:destinationPolicy>
<broker:policyMap>
<broker:policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<broker:policyEntry queue=">">
<broker:deadLetterStrategy>
<broker:sharedDeadLetterStrategy processExpired="true"
processNonPersistent="true" />
</broker:deadLetterStrategy>
</broker:policyEntry>
</broker:policyEntries>
</broker:policyMap>
</broker:destinationPolicy>
</broker:broker>

<!-- Configure Camel ActiveMQ to use the embedded ActiveMQ broker declared above -->
<!-- Using the ActiveMQComponent gives us connection pooling for free -->
<bean id="jms" class="org.apache.activemq.camel.component.ActiveMQComponent">
<property name="brokerURL" value="vm://myBroker" />
<property name="transacted" value="true"/>
<property name="transactionManager" ref="jmsTransactionManager"/>
<property name="acceptMessagesWhileStopping" value="false"/>
</bean>
<bean id="jmsTransactionManager" class="org.springframework.jms.connection.JmsTransactionManager">
<property name="connectionFactory" ref="jmsConnectionFactory"/>
</bean>
<bean id="jmsConnectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
<property name="brokerURL" value="vm://myBroker" />
</bean>

最后我有一个单元测试,它创建两个消息,一个将被处理,另一个应该超时。
@RunWith(CamelSpringJUnit4ClassRunner.class)
@ContextConfiguration(locations = {"classpath:/META-INF/spring/camel-server.xml"})
public class FulfillmentRequestTimeoutTest {

@EndpointInject(uri = "mock:initialProcessor")
protected MockEndpoint mockEndpoint;

@Produce
protected ProducerTemplate template;

protected ConsumerTemplate consumer;

@Autowired
@Qualifier("camel-server")
protected CamelContext context;

@DirtiesContext
@Test
public void requestPutOnTimedOutQueueIfOlderThanTimeToLive() throws Exception {

// Given
consumer = context.createConsumerTemplate();

int expectedValidMessageCount = 2;
mockEndpoint.expectedMessageCount(expectedValidMessageCount);

// When
String xmlBody1 = "<?xml version=\"1.0\"?><body>THIS WILL NOT TIMEOUT</body>";
template.sendBody("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody1);

long ttl = System.currentTimeMillis() - 12000000;
String xmlBody2 = "<?xml version=\"1.0\"?><body>!!!!!TIMED OUT!!!!!</body>";
template.sendBodyAndHeader("jms:queue:fulfillmentRequest", ExchangePattern.InOnly, xmlBody2, "JMSExpiration", ttl);

// Then
// The second message is not processed
mockEndpoint.assertIsSatisfied(); // This should not pass with "2" set above

List<Exchange> list = mockEndpoint.getReceivedExchanges();
String notTimedOutMessageBody = (String) list.get(0).getIn().getBody(String.class);

assertEquals(xmlBody1, notTimedOutMessageBody);

Thread.sleep(5000);

// And is instead routed to the timedOut JMS queue
Object dlqBody = consumer.receiveBodyNoWait("jms:queue:dead");
assertNotNull("Should not lose the message", dlqBody); // This fails
assertEquals(xmlBody2, dlqBody);
}

@Configuration
public static class ContextConfig extends SingleRouteCamelConfiguration {

@Bean
public RouteBuilder route() {
return new FulfillmentRequestRoute();
}
}
}

尽管考虑到下面@Petter 的提示(感谢),但第二条消息根本没有到期。

我有这个单元测试模式在测试中的其他地方工作,这些测试显式地从 Camel 中的事务中抛出异常,但是当这一切似乎已经被处理时,我不希望自己手动开始查看标题。

最佳答案

在队列中已过期或在 ActiveMQ 中命中队列时的消息将采取以下任一操作:

  • 如果消息是持久的,那么它将被放入队列 ActiveMQ.DLQ .
  • 如果消息是非持久的,它将立即被丢弃,恕不另行通知。

  • 在您的 ActiveMQ 配置中,您已禁用持久性,因此快速猜测您的消息将被删除,而不会通知您的应用程序。

    阅读更多 here

    关于activemq - 我的过期 JMS 消息在哪里?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/16612716/

    25 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com