gpt4 book ai didi

java - RabbitMQ - Java Spring - 如何初始化交换到多个队列?

转载 作者:行者123 更新时间:2023-11-30 10:43:03 25 4
gpt4 key购买 nike

我很难找到一种 Spring 方法来初始化将传入消息发送到多个队列的交换 - 在我的 Spring-boot 应用程序上:

我找不到定义秒交换队列绑定(bind)的好方法。

我使用 RabbitTemplate 作为生产者客户端。

RabbitMQ 6 页教程并没有真正帮助,因为:

  1. 消费者按需提供的唯一几个初始临时队列(而我需要生产者进行绑定(bind) - 到持久队列)
  2. 这些示例用于基本的 java 用法 - 不使用 Spring 功能。

我也没有找到如何通过 spring AMQP 页面实现它。

到目前为止我得到的是尝试将基本的 java 绑定(bind)注入(inject) spring 的方式 - 但它不起作用......

@Bean
public ConnectionFactory connectionFactory() throws IOException {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");

Connection conn = connectionFactory.createConnection();
Channel channel = conn.createChannel(false);

channel.exchangeDeclare(SPRING_BOOT_EXCHANGE, "fanout");
channel.queueBind(queueName, SPRING_BOOT_EXCHANGE, ""); //first bind
channel.queueBind(queueName2, SPRING_BOOT_EXCHANGE, "");// second bind

return connectionFactory;
}

任何帮助将不胜感激

已编辑

认为问题出现的事实是,每次我重新启动服务器时,它都会尝试重新定义交换查询绑定(bind)——而它们仍然存在于代理中……我设法通过代理 UI 控制台手动定义它们 - 因此生产者只知道交换名称,而消费者只知道它的相关队列。有没有一种方法可以以编程方式定义这些元素 - 但如果在以前的重启中已经存在,那么它不会被重新定义\覆盖?

最佳答案

我们使用类似于下面的方法将数据从一个特定的输入 channel 发送到其他消费者的多个输入队列:

@Bean
public IntegrationFlow integrationFlow(final RabbitTemplate rabbitTemplate, final AmqpHeaderMapper amqpHeaderMapper) {
IntegrationFlows
.from("some-input-channel")
.handle(Amqp.outboundAdapter(rabbitTemplate)
.headerMapper(headerMapper))
.get()
}

@Bean
public AmqpHeaderMapper amqpHeaderMapper() {
final DefaultAmqpHeaderMapper headerMapper = new DefaultAmqpHeaderMapper();
headerMapper.setRequestHeaderNames("*");
return headerMapper;
}

@Bean
public ConnectionFactory rabbitConnectionFactory() {
return new CachingConnectionFactory();
}

@Bean
public RabbitAdmin rabbitAdmin(final ConnectionFactory rabbitConnectionFactory) {
final RabbitAdmin rabbitAdmin = new RabbitAdmin(rabbitConnectionFactory);
rabbitAdmin.afterPropertiesSet();
return rabbitAdmin;
}

@Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory rabbitConnectionFactory, final RabbitAdmin rabbitAdmin) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);

final FanoutExchange fanoutExchange = new FanoutExchange(MY_FANOUT.getFanoutName());
fanoutExchange.setAdminsThatShouldDeclare(rabbitAdmin);
for (final String queueName : MY_FANOUT.getQueueNames) {
final Queue queue = new Queue(queueName, true);
queue.setAdminsThatShouldDeclare(rabbitAdmin);

final Binding binding = BindingBuilder.bind(queue).to(fanoutExchange);
binding.setAdminsThatShouldDeclare(rabbitAdmin);
}
rabbitTemplate.setExchange(fanoutExchange);
}

为了完整起见,这里是扇出声明的枚举:

public enum MyFanout {
MY_FANOUT(Lists.newArrayList("queue1", "queue2"), "my-fanout"),

private final List<String> queueNames;
private final String fanoutName;

MyFanout(final List<String> queueNames, final String fanoutName) {
this.queueNames = requireNonNull(queueNames, "queue must not be null!");
this.fanoutName = requireNonNull(fanoutName, "exchange must not be null!");
}

public List<String> getQueueNames() {
return this.queueNames;
}

public String getFanoutName() {
return this.fanoutName;
}
}

希望对您有所帮助!

关于java - RabbitMQ - Java Spring - 如何初始化交换到多个队列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37906043/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com