gpt4 book ai didi

java - Camel Route无限运行以移动JMS消息

转载 作者:行者123 更新时间:2023-12-02 12:00:44 24 4
gpt4 key购买 nike

我正在尝试使用 Camel 路由器以 5 分钟的周期间隔将消息从 Activity MQ 中的队列 1(死信队列)移动到队列 2。我正在使用下面的代码来实现这一点:-

    public class MessageRouteBuilder extends RouteBuilder {

private static final Logger LOG =
LoggerFactory.getLogger(MessageRouteBuilder.class);

/*
* (non-Javadoc)
*
* @see org.apache.camel.builder.RouteBuilder#configure()
*/
@Override
public void configure() throws Exception {
LOG.info("Routing of camel is started");
CronScheduledRoutePolicy startPolicy = new CronScheduledRoutePolicy();
startPolicy.setRouteStartTime("0 0/5 * * * ?");

from(
"jms:queue:DLQ.Consumer.OUTDOCS.VirtualTopic.queue1")
.routeId("DLQMessageMoverID").routePolicy(startPolicy)
.noAutoStartup()
.to("jms:queue:Consumer.OUTDOCS.VirtualTopic.queue1");
LOG.info("Routing of camel is done");

}

}


@Startup
@Singleton
public class ScheduledMessageDLQConsumer {

@Inject
private MessagingUtil msgUtil;

@Inject
private MessageRouteBuilder builder;

private static final Logger LOG =
LoggerFactory.getLogger(ScheduledMessageDLQConsumer.class);
@PostConstruct
public void init() {
LOG.info("camel Scheduling scheduled started");
CamelContext camelContext = new DefaultCamelContext();
ConnectionFactory connectionFactory = msgUtil.getAMQConnectionFactory();
camelContext.addComponent("jms", JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));

try {
camelContext.addRoutes(builder);
camelContext.start();
LOG.info("Camel scheduling completed");
} catch (Exception e) {
// TODO Auto-generated catch block

LOG.error("Error in registering camel route builder", e);
}

LOG.info(" camel Scheduling scheduled completed");
}

}

这里的问题是:- Camel 路由在 5 分钟后启用。它将消息从 DLQ (DLQ.Consumer.OUTDOCS.VirtualTopic.queue1) 移动到queue1 (Consumer.OUTDOCS.VirtualTopic.queue1)。但如果消息是有毒的,它会再次返回到DLQ,并再次路由将消息从DLQ移动到正常队列,并且这个过程继续无限运行。

我的要求是路由应该每 5 分钟后仅将消息从 DLQ 移动到队列一次?如果出现有毒消息,它应该仅在 5 分钟后检查。

最佳答案

首先,你的整个想法看起来很糟糕。重新处理和重新交付应在消费者或经纪人上处理,没有任何晦涩的期刊“DLQMessageMover”。如果您可以控制从 OUTDOCS.VirtualTopic.queue1 消耗的应用程序,请重新考虑错误处理的概念。

顺便说一句maximumRedeliveries的简单组合=-1 和 redeliveryDelay =300000 对消费者连接的影响与您在这个问题中编写的所有代码相同。

其次,您需要idempotent consumer header 上带有名为 JMSCorrelationID 的相关键。此过程仅处理每个相关 ID 一次。当使用MemoryIdempotentRepository时,在路由重启时它会被清除,因此消息会被再次处理,这符合您的要求。

我创建了一个小例子来展示它是如何工作的。在您的情况下,将不会模拟 JMSCorrelationID header 和 jms 组件而不是计时器。

public class IdempotentConsumerRouteBuilder extends RouteBuilder {
private final IdempotentRepository idempotentRepository = new MemoryIdempotentRepository();
private final List<String> mockCorrelationIds = Arrays.asList("id0","id0","id0","id1","id2","id0","id4","id0","id6","id7");

public void configure() {
CronScheduledRoutePolicy startPolicy = new CronScheduledRoutePolicy();
startPolicy.setRouteStopTime("0 0/5 * * * ?");
startPolicy.setRouteStartTime("0 0/5 * * * ?");

from("timer:jms?period=100")
.routePolicy(startPolicy)
.process(e -> e.getIn().setHeader(
"JMSCorrelationID", //Mock JMSCorrelationID to work with timer as it is jms component
mockCorrelationIds.get(e.getProperty("CamelTimerCounter", Integer.class)%10))
)
.idempotentConsumer(header("JMSCorrelationID"), idempotentRepository)
.log("correlationId is ${header.JMSCorrelationID}")
.to(("log:done?level=OFF"))
.end();

}}

这段代码的输出:

[artzScheduler-camel-1_Worker-3] DefaultCamelContext            INFO  Route: route1 started and consuming from: timer://jms?period=100
[mel-1) thread #4 - timer://jms] route1 INFO correlationId is id0
[mel-1) thread #4 - timer://jms] route1 INFO correlationId is id1
[mel-1) thread #4 - timer://jms] route1 INFO correlationId is id2
[mel-1) thread #4 - timer://jms] route1 INFO correlationId is id4
[mel-1) thread #4 - timer://jms] route1 INFO correlationId is id6
[mel-1) thread #4 - timer://jms] route1 INFO correlationId is id7
[artzScheduler-camel-1_Worker-6] DefaultShutdownStrategy INFO Starting to graceful shutdown 1 routes (timeout 10000 milliseconds)
[el-1) thread #5 - ShutdownTask] DefaultShutdownStrategy INFO Route: route1 shutdown complete, was consuming from: timer://jms?period=100
[artzScheduler-camel-1_Worker-6] DefaultShutdownStrategy INFO Graceful shutdown of 1 routes completed in 0 seconds
[artzScheduler-camel-1_Worker-6] DefaultCamelContext INFO Route: route1 is stopped, was consuming from: timer://jms?period=100
[artzScheduler-camel-1_Worker-8] ScheduledRoutePolicy WARN Route is not in a started/suspended state and cannot be stopped. The current route state is Stopped
[artzScheduler-camel-1_Worker-7] DefaultCamelContext INFO Route: route1 started and consuming from: timer://jms?period=100
[mel-1) thread #6 - timer://jms] route1 INFO correlationId is id0
[mel-1) thread #6 - timer://jms] route1 INFO correlationId is id1
[mel-1) thread #6 - timer://jms] route1 INFO correlationId is id2
[mel-1) thread #6 - timer://jms] route1 INFO correlationId is id4
[mel-1) thread #6 - timer://jms] route1 INFO correlationId is id6
[mel-1) thread #6 - timer://jms] route1 INFO correlationId is id7
[rtzScheduler-camel-1_Worker-10] DefaultShutdownStrategy INFO Starting to graceful shutdown 1 routes (timeout 10000 milliseconds)

关于java - Camel Route无限运行以移动JMS消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47269017/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com