gpt4 book ai didi

java - 如何在使用 apache qpid 时处理 jms 消息传递中的生产者流控制

转载 作者:行者123 更新时间:2023-11-30 07:28:58 24 4
gpt4 key购买 nike

我正在尝试处理生产者端的流量控制情况。我在设置了最大队列大小的 qpid-broker 上有一个队列。还在队列上设置 flow_stop_count 和 flow_resume_count。

现在生产者不断地生产消息,直到达到这个 flow_stop_count。违反此计数时,将抛出异常,由异常监听器处理。现在一段时间后,队列中的消费者将 catch 并达到 flow_resume_count。问题是生产者如何知道这个事件。

这里是producer的示例代码

    connection connection = connectionFactory.createConnection();
connection.setExceptionListenr(new MyExceptionListerner());
connection.start();
Session session = connection.createSession(false,Session.CLIENT_ACKNOWLEDGE);
Queue queue = (Queue)context.lookup("Test");
MessageProducer producer = session.createProducer(queue);
while(notStopped){
while(suspend){//---------------------------how to resume this flag???
Thread.sleep(1000);
}
TextMessage message = session.createTextMessage();
message.setText("TestMessage");
producer.send(message);
}
session.close();
connection.close();

对于异常监听器

    private class MyExceptionListener implements ExceptionListener {
public void onException(JMSException e) {
System.out.println("got exception:" + e.getMessage());
suspend=true;
}
}

现在 exceptionlistener 是一个通用的异常监听器,因此暂停生产者流程不是一个好主意。

我需要的可能是生产者级别的一些方法,例如 produer.isFlowStopped() 之类的东西,我可以用它在发送消息之前进行检查。 qpid api中是否存在这样的功能。

关于 qpid website 有一些文档这表明可以做到这一点。但我在任何地方都找不到这样做的例子。

是否有一些标准的方法来处理这种情况。

最佳答案

从我从 Apache QPid 文档中读到的内容来看,flow_resume_count 和 flow_stop_count 似乎会导致生产者开始被阻止。

因此,唯一的选择是在软件方面定期轮询,直到消息再次开始流动。

摘自 here .

If a producer sends to a queue which is overfull, the broker will respond by instructing the client not to send any more messages. The impact of this is that any future attempts to send will block until the broker rescinds the flow control order.

While blocking the client will periodically log the fact that it is blocked waiting on flow control.

WARN AMQSession - Broker enforced flow control has been enforced WARN AMQSession - Message send delayed by 5s due to broker enforced flow control WARN AMQSession - Message send delayed by 10s due to broker enforced flow control After a set period the send will timeout and throw a JMSException to the calling code.

ERROR AMQSession - Message send failed due to timeout waiting on broker enforced flow control.

从该文档中可以看出,管理生产者的软件必须进行 self 管理。所以基本上,当您收到队列过满的异常时,您将需要退后,很可能轮询并重新尝试发送您的消息。

关于java - 如何在使用 apache qpid 时处理 jms 消息传递中的生产者流控制,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/8848328/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com