gpt4 book ai didi

java - 从 Java 客户端中的多个 RabbitMQ 交换中读取,无需轮询

转载 作者:太空宇宙 更新时间:2023-11-04 06:58:59 25 4
gpt4 key购买 nike

请解释如何配置 Java 客户端以从两个不同的 RabbitMQ 交换中读取数据而不进行轮询。我希望客户端在消息到达时唤醒,然后再次阻塞。

在我的小系统集成问题中,一个 RabbitMQ 交换器使用各种路由键(我知道如何使用通配符来捕获它们)携带工作消息,而另一个交换器携带控制消息(例如“停止”)。所以我的客户必须听取来自两个地方的消息。这是一个相对低容量的系统问题,我不是在问负载共享或公平性等。

当然,我可以运行一个线程来轮询每个交换、 sleep 、调度,永远。但我想避免轮询。

不知何故,我想起了 Unix select() 系统调用,当传递给它的任何文件描述符上的数据准备就绪时,该系统调用就会被唤醒。 RabbitMQ 有类似的东西吗?

我当前的解决方案是一个适配器,它启动一个线程来阻塞每个输入交换;收到后,每个线程都会写入 java.util.concurrent 集合;我使用另一个线程来阻止该集合并在消息到达最终消费者时传递消息。它工作得很好,但如果我能消除这种复杂性,那就太好了。

这些帖子围绕着这个问题跳舞,如果我在这些帖子中忽略了它,请随意在解决方案中摸索我的 Nose :

对于java: RabbitMQ by Example: Multiple Threads, Channels and Queues

对于 C#: Reading from multiple queues, RabbitMQ

提前致谢。

最佳答案

谢谢 robthewolf 的评论。是的,我已经阅读了教程,我知道每个消费者需要一个线程。

事实证明,使用单个线程从多个交换器中读取数据非常简单,根本不需要轮询:获取一个新队列,并将其绑定(bind)到所有相关交换器。适用于主题和扇出。使用 SSCE 对此进行了测试,请参见下文。

我对 RabbitMQ javadoc 中缺乏细节感到遗憾,Channel#queueBind(String, String, String) 方法中的一些选择词会有很大帮助。

HTH

package rabbitExample;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;

/**
* Demonstrates reading messages from two exchanges via a single queue monitored
* by a single thread.
*
*/
public class MultiExchangeReadTest implements Runnable {

private final String exch1 = "my.topic.exchange";
private final String exch2 = "my.fanout.exchange";
private final Channel channel;
private final QueueingConsumer consumer;

public MultiExchangeReadTest(final String mqHost) throws Exception {

// Connect to server
System.out.println("Connecting to host " + mqHost);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(mqHost);
Connection connection = factory.newConnection();
channel = connection.createChannel();

// Declare exchanges; use defaults for durable etc.
channel.exchangeDeclare(exch1, "topic");
channel.exchangeDeclare(exch2, "fanout");

// Get a new, unique queue name
final String queue = channel.queueDeclare().getQueue();

// Bind the queue to the exchanges; topic gets non-empty routing key
channel.queueBind(queue, exch1, "my.key");
channel.queueBind(queue, exch2, "");

// Configure the channel to fetch one message at a time, auto-ACK
channel.basicQos(1);
consumer = new QueueingConsumer(channel);
channel.basicConsume(queue, true, consumer);
}

public void run() {
// Reads messages until interrupted
try {
while (true) {
// Wait for a message
System.out.println("Awaiting message");
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
// Show contents using default encoding scheme
String body = new String(delivery.getBody());
System.out.println("Message from exch "
+ delivery.getEnvelope().getExchange() + ", key '"
+ delivery.getEnvelope().getRoutingKey() + "':\n"
+ body);
} // while
} catch (Exception ex) {
ex.printStackTrace();
}
}

public static void main(String[] args) throws Exception {
if (args.length != 1) {
System.err
.println("Usaage: MultiExchangeReadTest.main mq-host-name");
} else {
MultiExchangeReadTest multiReader = new MultiExchangeReadTest(
args[0]);
multiReader.run();
}
}
}

关于java - 从 Java 客户端中的多个 RabbitMQ 交换中读取,无需轮询,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/22362164/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com