gpt4 book ai didi

java - RabbitMQ 没有选择正确的消费者

转载 作者:行者123 更新时间:2023-12-02 02:00:07 26 4
gpt4 key购买 nike

我从这里拿了例子http://www.rabbitmq.com/tutorials/tutorial-six-java.html ,又添加了一个来自 RPCClient 的 RPC 调用,并添加了一些到 stdout 的日志记录。因此,当执行第二次调用时,rabbitmq 使用具有错误相关 ID 的消费者,这不是预期的行为。这是一个错误还是我弄错了什么?

RPC服务器:

package com.foo.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class RPCServer {

private static final String RPC_QUEUE_NAME = "sap-consume";

private static int fib(int n) {
if (n ==0) return 0;
if (n == 1) return 1;
return fib(n-1) + fib(n-2);
}

public static void main(String[] argv) {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);

Connection connection = null;
try {
connection = factory.newConnection();
final Channel channel = connection.createChannel();

channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);

channel.basicQos(1);

System.out.println(" [x] Awaiting RPC requests");

Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();

String response = "";

try {
String message = new String(body,"UTF-8");
int n = Integer.parseInt(message);

System.out.println(" [.] fib(" + message + ")");
response += fib(n);
}
catch (RuntimeException e){
System.out.println(" [.] " + e.toString());
}
finally {
channel.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channel.basicAck(envelope.getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized(this) {
this.notify();
}
}
}
};

channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized(consumer) {
try {
consumer.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
} catch (IOException | TimeoutException e) {
e.printStackTrace();
}
finally {
if (connection != null)
try {
connection.close();
} catch (IOException _ignore) {}
}
}
}

RPC客户端:

package com.bar.rabbitmq;

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Envelope;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeoutException;

public class RPCClient {

private Connection connection;
private Channel channel;
private String requestQueueName = "sap-consume";
private String replyQueueName;

public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);

connection = factory.newConnection();
channel = connection.createChannel();

replyQueueName = channel.queueDeclare().getQueue();
}

public String call(String message) throws IOException, InterruptedException {
final String corrId = UUID.randomUUID().toString();

AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();

channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));

final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);

channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
System.out.println("Correlation Id" + properties.getCorrelationId() + " corresponds to expected one.");
response.offer(new String(body, "UTF-8"));
} else {
System.out.println("Correlation Id" + properties.getCorrelationId() + " doesn't correspond to expected one " + corrId);
}
}
});

return response.take();
}

public void close() throws IOException {
connection.close();
}

public static void main(String[] argv) {
RPCClient rpc = null;
String response = null;
try {
rpc = new RPCClient();

System.out.println(" [x] Requesting fib(30)");
response = rpc.call("30");
System.out.println(" [.] Got '" + response + "'");
System.out.println(" [x] Requesting fib(40)");
response = rpc.call("40");
System.out.println(" [.] Got '" + response + "'");
} catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
} finally {
if (rpc != null) {
try {
rpc.close();
} catch (IOException _ignore) {
}
}
}
}
}

最佳答案

是的,您在教程代码中发现了一个错误。我已经在此处打开了一个拉取请求来修复它,您也可以找到所发生情况的解释:

https://github.com/rabbitmq/rabbitmq-tutorials/pull/174

<小时/>

注意: RabbitMQ 团队监控 the rabbitmq-users mailing list并且有时只在 StackOverflow 上回答问题。

关于java - RabbitMQ 没有选择正确的消费者,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51706696/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com