gpt4 book ai didi

java - 使用以下几个队列进行请求-回复

转载 作者:行者123 更新时间:2023-12-02 09:38:08 29 4
gpt4 key购买 nike

我有多个 InOut 模式的顺序队列,每个队列通向一个 Camel 处理器。如果处理器花费太多时间进行处理,我希望请求-答复检测到超时并发送 ExchangeTimedOutExceptionCamelJmsRequestTimeout 的超时有效,但仅适用于第一个队列。

例如,如果我有这个:

q1 -> p1 -> q2 -> p2 ->  q3 -> p3

q1 CamelJmsRequestTimeout 例如为 5 秒。如果每个处理器花费 3 秒(我用 sleep() 模拟处理器中的延迟),我将在 p2 处超时,因为总时间为 6 秒。为什么对 q1 的回复不是在 p1 之后发送,而是在 p3 之后发送?

这是示例代码:

from("jms:queue:q1")
.setExchangePattern(ExchangePattern.InOut)
.setHeader(JmsConstants.JMS_REQUEST_TIMEOUT, constant("5000"))
.process("p1")
.to("jms:queue:q2");

from("jms:queue:q2")
.setExchangePattern(ExchangePattern.InOut)
.setHeader(JmsConstants.JMS_REQUEST_TIMEOUT, constant("5000"))
.process("p2")
.to("jms:queue:q3");

from("jms:queue:q3")
.setExchangePattern(ExchangePattern.InOut)
.setHeader(JmsConstants.JMS_REQUEST_TIMEOUT, constant("5000"))
.process("p3");

我的设计可行吗?如果是,我应该使用哪个选项?

谢谢

编辑:

2019-08-07 09:45:02,718 [main] INFO  SpringCamelContext  - Apache Camel 2.21.0 (CamelContext: camel-1) started in 1.152 seconds
2019-08-07 09:45:05,973 [Camel (camel-1) thread #1 - JmsConsumer[q1]] INFO input - Exchange[ExchangePattern: InOut, BodyType: String, Body: stuff in the body]
2019-08-07 09:45:09,022 [Camel (camel-1) thread #2 - JmsConsumer[q2]] INFO input - Exchange[ExchangePattern: InOut, BodyType: String, Body: stuff in the body]
2019-08-07 09:45:11,991 [Camel (camel-1) thread #25 - JmsReplyManagerOnTimeout[q2]] WARN TemporaryQueueReplyManager - Timeout occurred after 5000 millis waiting for reply message with correlationID [Camel-ID-ITEM-S69138-1565163898705-0-2] on destination temp-queue://ID:ITEM-S69138-55901-1565163899733-4:20:1. Setting ExchangeTimedOutException on (MessageId: ID:ITEM-S69138-55901-1565163899733-4:19:1:1:1 on ExchangeId: ID-ITEM-S69138-1565163898705-0-1) and continue routing.
2019-08-07 09:45:11,996 [Camel (camel-1) thread #1 - JmsConsumer[q1]] WARN EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-2 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:20:1. Exchange[ID-ITEM-S69138-1565163898705-0-1]]
org.apache.camel.RuntimeCamelException: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-2 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:20:1. Exchange[ID-ITEM-S69138-1565163898705-0-1]
at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1830)
at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:196)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:117)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-2 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:20:1. Exchange[ID-ITEM-S69138-1565163898705-0-1]
at org.apache.camel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:170)
at org.apache.camel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:62)
at org.apache.camel.component.jms.reply.CorrelationTimeoutMap$1.run(CorrelationTimeoutMap.java:60)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
2019-08-07 09:45:11,997 [Camel (camel-1) thread #25 - JmsReplyManagerOnTimeout[q2]] ERROR DefaultErrorHandler - Failed delivery for (MessageId: ID:ITEM-S69138-55901-1565163899733-4:19:1:1:1 on ExchangeId: ID-ITEM-S69138-1565163898705-0-1). Exhausted after delivery attempt: 1 caught: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-2 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:20:1. Exchange[ID-ITEM-S69138-1565163898705-0-1]

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route1 ] [route1 ] [jms://queue:q1 ] [ 9051]
[route1 ] [setExchangePattern] [setExchangePattern[InOut] ] [ 6]
[route1 ] [setHeader1 ] [setHeader[CamelJmsRequestTimeout] ] [ 0]
[route1 ] [process1 ] [Processor@0x39652a30 ] [ 3001]
[route1 ] [to1 ] [log:input ] [ 4]
[route1 ] [to2 ] [jms:queue:q2 ] [ 6019]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-2 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:20:1. Exchange[ID-ITEM-S69138-1565163898705-0-1]
at org.apache.camel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:170)
at org.apache.camel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:62)
at org.apache.camel.component.jms.reply.CorrelationTimeoutMap$1.run(CorrelationTimeoutMap.java:60)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-08-07 09:45:12,062 [Camel (camel-1) thread #3 - JmsConsumer[q3]] INFO input - Exchange[ExchangePattern: InOut, BodyType: String, Body: stuff in the body]
2019-08-07 09:45:12,093 [Camel (camel-1) thread #4 - JmsConsumer[q4]] INFO input - Exchange[ExchangePattern: InOnly, BodyType: String, Body: stuff in the body]
2019-08-07 09:45:15,034 [Camel (camel-1) thread #28 - JmsReplyManagerOnTimeout[q3]] WARN TemporaryQueueReplyManager - Timeout occurred after 5000 millis waiting for reply message with correlationID [Camel-ID-ITEM-S69138-1565163898705-0-4] on destination temp-queue://ID:ITEM-S69138-55901-1565163899733-4:21:1. Setting ExchangeTimedOutException on (MessageId: ID:ITEM-S69138-55901-1565163899733-4:1:1:1:1 on ExchangeId: ID-ITEM-S69138-1565163898705-0-3) and continue routing.
2019-08-07 09:45:15,038 [Camel (camel-1) thread #2 - JmsConsumer[q2]] WARN EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-4 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:21:1. Exchange[ID-ITEM-S69138-1565163898705-0-3]]
org.apache.camel.RuntimeCamelException: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-4 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:21:1. Exchange[ID-ITEM-S69138-1565163898705-0-3]
at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1830)
at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:196)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:117)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-4 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:21:1. Exchange[ID-ITEM-S69138-1565163898705-0-3]
at org.apache.camel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:170)
at org.apache.camel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:62)
at org.apache.camel.component.jms.reply.CorrelationTimeoutMap$1.run(CorrelationTimeoutMap.java:60)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
2019-08-07 09:45:15,039 [Camel (camel-1) thread #28 - JmsReplyManagerOnTimeout[q3]] ERROR DefaultErrorHandler - Failed delivery for (MessageId: ID:ITEM-S69138-55901-1565163899733-4:1:1:1:1 on ExchangeId: ID-ITEM-S69138-1565163898705-0-3). Exhausted after delivery attempt: 1 caught: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-4 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:21:1. Exchange[ID-ITEM-S69138-1565163898705-0-3]

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route2 ] [route2 ] [jms://queue:q2 ] [ 9017]
[route2 ] [setExchangePattern] [setExchangePattern[InOut] ] [ 0]
[route2 ] [setHeader2 ] [setHeader[CamelJmsRequestTimeout] ] [ 0]
[route2 ] [process2 ] [Processor@0x5763a655 ] [ 3001]
[route2 ] [to3 ] [log:input ] [ 2]
[route2 ] [to4 ] [jms:queue:q3 ] [ 6012]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-4 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:21:1. Exchange[ID-ITEM-S69138-1565163898705-0-3]
at org.apache.camel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:170)
at org.apache.camel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:62)
at org.apache.camel.component.jms.reply.CorrelationTimeoutMap$1.run(CorrelationTimeoutMap.java:60)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2019-08-07 09:45:18,071 [Camel (camel-1) thread #29 - JmsReplyManagerOnTimeout[q4]] WARN TemporaryQueueReplyManager - Timeout occurred after 5000 millis waiting for reply message with correlationID [Camel-ID-ITEM-S69138-1565163898705-0-6] on destination temp-queue://ID:ITEM-S69138-55901-1565163899733-4:22:1. Setting ExchangeTimedOutException on (MessageId: ID:ITEM-S69138-55901-1565163899733-4:2:1:1:1 on ExchangeId: ID-ITEM-S69138-1565163898705-0-5) and continue routing.
2019-08-07 09:45:18,072 [Camel (camel-1) thread #3 - JmsConsumer[q3]] WARN EndpointMessageListener - Execution of JMS message listener failed. Caused by: [org.apache.camel.RuntimeCamelException - org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-6 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:22:1. Exchange[ID-ITEM-S69138-1565163898705-0-5]]
org.apache.camel.RuntimeCamelException: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-6 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:22:1. Exchange[ID-ITEM-S69138-1565163898705-0-5]
at org.apache.camel.util.ObjectHelper.wrapRuntimeCamelException(ObjectHelper.java:1830)
at org.apache.camel.component.jms.EndpointMessageListener$EndpointMessageListenerAsyncCallback.done(EndpointMessageListener.java:196)
at org.apache.camel.component.jms.EndpointMessageListener.onMessage(EndpointMessageListener.java:117)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:719)
at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:679)
at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:649)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:317)
at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:255)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1168)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1160)
at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1057)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-6 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:22:1. Exchange[ID-ITEM-S69138-1565163898705-0-5]
at org.apache.camel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:170)
at org.apache.camel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:62)
at org.apache.camel.component.jms.reply.CorrelationTimeoutMap$1.run(CorrelationTimeoutMap.java:60)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
... 3 more
2019-08-07 09:45:18,073 [Camel (camel-1) thread #29 - JmsReplyManagerOnTimeout[q4]] ERROR DefaultErrorHandler - Failed delivery for (MessageId: ID:ITEM-S69138-55901-1565163899733-4:2:1:1:1 on ExchangeId: ID-ITEM-S69138-1565163898705-0-5). Exhausted after delivery attempt: 1 caught: org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-6 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:22:1. Exchange[ID-ITEM-S69138-1565163898705-0-5]

Message History
---------------------------------------------------------------------------------------------------------------------------------------
RouteId ProcessorId Processor Elapsed (ms)
[route3 ] [route3 ] [jms://queue:q3 ] [ 9010]
[route3 ] [setExchangePattern] [setExchangePattern[InOut] ] [ 0]
[route3 ] [setHeader3 ] [setHeader[CamelJmsRequestTimeout] ] [ 0]
[route3 ] [process3 ] [Processor@0x40c8067 ] [ 3000]
[route3 ] [to5 ] [log:input ] [ 0]
[route3 ] [to6 ] [jms:queue:q4 ] [ 6009]

Stacktrace
---------------------------------------------------------------------------------------------------------------------------------------
org.apache.camel.ExchangeTimedOutException: The OUT message was not received within: 5000 millis due reply message with correlationID: Camel-ID-ITEM-S69138-1565163898705-0-6 not received on destination: temp-queue://ID:ITEM-S69138-55901-1565163899733-4:22:1. Exchange[ID-ITEM-S69138-1565163898705-0-5]
at org.apache.camel.component.jms.reply.ReplyManagerSupport.processReply(ReplyManagerSupport.java:170)
at org.apache.camel.component.jms.reply.TemporaryQueueReplyHandler.onTimeout(TemporaryQueueReplyHandler.java:62)
at org.apache.camel.component.jms.reply.CorrelationTimeoutMap$1.run(CorrelationTimeoutMap.java:60)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

最佳答案

这里重要的一点是您执行3 个单独的 JMS 交互

因此,如果您根本没有超时,则发送者将在大约 9 秒后在其监听的应答队列上收到应答。因为发送者不知道 q1 背后的事情,所以这看起来像

Sender > q1 > listen on answer-q and wait > answer-message

当您在发送方将超时设置为 5 秒时,当达到超时时,请求将被丢弃。

Sender > q1 > listen on answer-q > stop listening after 5 seconds

但是,当发送方停止监听 q1 的答案时,这不会影响其他 JMS 交互!每个 JMS 生产者在发送消息时都会启动超时。

q1-route 的生产者在收到来自发送者的消息后大约 3 秒向 q2 发送一条消息。因为从此时开始处理至少需要6秒,所以也会遇到超时。因此它也会在 5 秒后停止监听答案。

q2-route 的生产者从 q1-route 收到消息后约 3 秒向 q3 发送消息,发送者发送初始消息后约 6 秒。因为从这一点开始的处理大约需要 3 秒,所以它将从 q3 收到答案

然后它会尝试将此答案发送回 q1-route 的生产者,但这个答案已经停止监听。我认为这会失败,但我不知道临时应答队列的确切行为。

总结一下:将计算出最终的处理结果,但由于这需要太长时间,因此无法发送回发送方

关于java - 使用以下几个队列进行请求-回复,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57328964/

29 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com