gpt4 book ai didi

java - RabbitMQ 直接回复。我收到了 AlreadyClosedException

转载 作者:行者123 更新时间:2023-12-03 23:14:21 25 4
gpt4 key购买 nike

解决:移动

channel.basicPublish("", QUEUE, props, message.getBytes());



以下

channel.basicConsume(replyQueue, ...)



这解决了问题。

我正在尝试弄清楚如何使用 RabbitMQ 直接回复功能。自 documentation关于如何实现它相当模糊,我尝试使用 RPC例如,采用它来代替直接回复。
private final static String QUEUE = "Test_chan";
private void directReplyToClient(ConnectionFactory factory) {
Connection connection = null;
Channel channel = null;
String replyQueue;

try {
connection = factory.newConnection();
channel = connection.createChannel();

//replyQueue = channel.queueDeclare().getQueue();
replyQueue = "amq.rabbitmq.reply-to";
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.replyTo(replyQueue)
.build();
String message = "Hello World";
channel.basicPublish("", QUEUE, props, message.getBytes());

final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);

channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {

response.offer(new String(body, "UTF-8"));

}
});

System.out.println(response.take());

} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (channel != null)
channel.close();
if (connection != null)
connection.close();
} catch (IOException | TimeoutException _ignore) {}
}
}

设置回复地址为

channel.queueDeclare().getQueue()



有效,但将其设置为

amq.rabbitmq.reply-to



给出以下异常:

Exception in thread "main" com.rabbitmq.client.AlreadyClosedException: channel is already closed due to channel error; protocol method: method(reply-code=406, reply-text=PRECONDITION_FAILED - fast reply consumer does not exist, class-id=60, method-id=40)



有没有人看到我做错了什么?任何指针将不胜感激。

最佳答案

所以这是解决方案的代码。在发布之前进行消费。

private final static String QUEUE = "Test_chan";

private void directReplyToProducer(ConnectionFactory factory) {
Connection connection = null;
Channel channel = null;
String replyQueue;

try {
connection = factory.newConnection();
channel = connection.createChannel();

replyQueue = "amq.rabbitmq.reply-to";
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.replyTo(replyQueue)
.build();
String message = "Hello World";

final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);
System.out.println(" [x] Sent x'" + message + "'");

channel.basicConsume(replyQueue, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws UnsupportedEncodingException {
response.offer(new String(body, "UTF-8"));
}
});
channel.basicPublish("", QUEUE, props, message.getBytes());

System.out.println(response.take());
Thread.sleep(10000);

} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (channel != null)
channel.close();
if (connection != null)
connection.close();
} catch (IOException | TimeoutException _ignore) {}
}
}

关于java - RabbitMQ 直接回复。我收到了 AlreadyClosedException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46642842/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com