gpt4 book ai didi

java - RabbitMQ+SpringAMPQ : Receiving multiple queues in SimpleMessageListenerContainer

转载 作者:行者123 更新时间:2023-12-01 17:26:49 27 4
gpt4 key购买 nike

我目前正在学习一些有关 RabbitMQ + SpringAMQ 的知识,我试图使我的 SimpleMessageListenerContainer 能够读取传递到我的两个队列的消息,但只有一个正在接收它。如果您对代码有任何进一步的提示,或者有任何更好的方法,请告诉我,那就太好了。

编辑:问题是代码不会向两个队列发送消息,仅向其中一个队列发送消息。

基本上我从两个队列接收数据,一个队列在数据库中插入一些内容,另一个队列将接收验证请求。我希望稍后发送对路由 key 和交换的响应。

这是我的 Application.java:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class ChallengeApplication {

public static void main(String[] args) {
SpringApplication.run(ChallengeApplication.class, args);
}

static final String responseExchange = "response.exchange";
static final String routingKey = "response.routing.key";
static final String queueInsertion = "insertion.queue";
static final String queueValidation = "validation.queue";

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
return connectionFactory;
}

@Bean
Queue queueInsertion() {
return new Queue(queueInsertion, true);
}

@Bean
Queue queueValidation() {
return new Queue(queueValidation, true);
}

@Bean
DirectExchange exchange() {
return new DirectExchange(responseExchange);
}

@Bean
Binding bindingInsertion(@Qualifier("queueInsertion") Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueInsertion);
}

@Bean
Binding queueValidation(@Qualifier("queueValidation") Queue queue, DirectExchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with(queueValidation);
}

@Bean
SimpleMessageListenerContainer containerValidation(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setConcurrentConsumers(3);
container.setQueueNames(queueValidation, queueInsertion);
container.setMessageListener(listenerAdapter);
return container;
}


@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}

}

正如您所看到的,我的 SimpleMessageListenerContainer 在 setQueueNames 中有两个我想监听的队列。我的 MessageListenerAdapter 将其发送到我的 Receiver.java 的 receiveMessage 方法,另一个问题是如何从我正在接收消息的队列中读取消息?这是可能的方式,还是我应该使用 @RabbitListener 来将其指向每个地方?

接收器.java

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.regex.Pattern;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.stereotype.Component;

import com.axur.challenge.DAO.WhitelistDAO;
import com.axur.challenge.formatters.InputData;
import com.axur.challenge.model.Whitelist;
import com.google.gson.Gson;
import com.google.gson.JsonParseException;

@Component
public class Receiver {

@Autowired
private WhitelistDAO whitelistDAO;

private CountDownLatch latch = new CountDownLatch(1);

public void receiveMessage(String message) {
try {
InputData inputData = new Gson().fromJson(message, InputData.class);
String url = inputData.getUrl();
if (url == null) {
receivedInsertion(inputData);
} else {
receivedValidation(inputData);
}
} catch (JsonParseException e) {
System.out.println("It was not possible to read the input");
}
latch.countDown();
}

public void receivedInsertion(InputData inputData) {
Whitelist whitelist = new Whitelist();
String client = inputData.getClient();

if (client == null) {
whitelist.setClient("global");
} else {
whitelist.setClient(inputData.getClient());
}

whitelist.setRegex(inputData.getRegex());

System.out.println("Whitelist: " + whitelist);

try {
Whitelist selectedWhitelist = whitelistDAO.getSpecificWhitelist(whitelist.getClient(), whitelist.getRegex());
System.out.println("selectedWhitelist: " + selectedWhitelist);
if (selectedWhitelist == null || !selectedWhitelist.equals(whitelist)) {
whitelistDAO.insertWhitelist(whitelist);
System.out.println("Whitelist Added");
} else {
System.out.println("Already added before");
}

} catch (DataAccessException dae) {
System.out.println("Whitelist NOT Added");
System.err.println(dae);
}

}

public void receivedValidation(InputData inputData) {
//getting all regex from this client
List<Whitelist> listWhitelist = whitelistDAO.getWhitelist(inputData.getClient(), inputData.getRegex());
boolean match = false;
//verify if the regex works for the url provided
if (listWhitelist != null) {
int index = 0;
while (match == false && index < listWhitelist.size()) {
match = checkRegex(listWhitelist.get(index).getRegex(), inputData.getUrl());
index++;
}
}

if (match == true) {

} else {

}
}

public boolean checkRegex(String regex, String url) {
return Pattern.compile(regex).matcher(url).find();
}

public CountDownLatch getLatch() {
return latch;
}

}

这是 Runner.java,我将消息发送到每个队列来测试它:

import java.util.concurrent.TimeUnit;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class Runner implements CommandLineRunner {

private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;

public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
}

@Override
public void run(String... args) throws Exception {
System.out.println("Sending message...");
rabbitTemplate.convertAndSend(ChallengeApplication.responseExchange, ChallengeApplication.queueInsertion, "{'client':null, 'regex':'[a-z]'}");
rabbitTemplate.convertAndSend(ChallengeApplication.responseExchange, ChallengeApplication.queueValidation, "{'client':null, 'regex':'[a-z]'}");
//rabbitTemplate.convertAndSend(ChallengeApplication.responseExchange, ChallengeApplication.routingKey, "{'client':null, 'regex':'[a-z]'}");
receiver.getLatch().await(3, TimeUnit.SECONDS);
}
}

感谢您的宝贵时间!

最佳答案

就您而言,我认为您应该使用 RabbitListner 并将队列名称作为参数传递,如下所示:

 @RabbitListener(queues = RabbitTopicConfig.QUEUE_NAME)
public void consumerDefaultMessage(final Message message){
LOGGER.info("Recive Message with default configuration {}", message.toString());
}

您应该将rabbitTemplate作为bean注入(inject)配置类中,在这种情况下,您可以使用许多列表器,因为存在队列:

    @Bean
public RabbitTemplate rabbitTemplate(final ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setMessageConverter(producerMessageConverter());
return rabbitTemplate;
}

关于java - RabbitMQ+SpringAMPQ : Receiving multiple queues in SimpleMessageListenerContainer,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61199808/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com