gpt4 book ai didi

java - 使用 Spring Boot RabbitMQ 的异步 RPC

转载 作者:行者123 更新时间:2023-11-30 06:32:04 26 4
gpt4 key购买 nike

我使用 Spring Boot 1.4 和 Rabbit mq 实现了基本的异步 RPC 调用。
我的目的是用这个例子作为沟通的基础
例如,Publisher.java 和 Subscriber.java 可能是两个相互通信的微服务。

显示的代码工作正常,但我很好奇是否有更好的方法这样做吗?

我的查询如下:

  • 为了让订阅者使用 @RabbitListener 注释监听请求队列,我不必声明 directExchange()binding() bean
    但是为了让 asyncRabbitTemplate 从回复队列读取响应,我必须声明 directExchange()binding() bean在配置中。
    有什么办法可以避免它,因为我觉得这是代码重复,因为我声明了这些 bean 两次。
  • 在现实世界的应用中,微服务之间会有很多这样的调用。根据我的理解,我需要声明类似的 rpcReplyMessageListenerContainer()asyncRabbitTemplate() 对于每个请求-回复调用。
    正确吗?

代码如下。 Link to Github

Config.java

@Configuration("asyncRPCConfig")
@Profile("async_rpc")
@EnableScheduling
@EnableRabbit
@ComponentScan(basePackages = {"in.rabbitmq.async_rpc"})
public class Config {

@Value("${queue.reply}")
private String replyQueue;
@Value("${exchange.direct}")
private String directExchange;
@Value("${routingKey.reply}")
private String replyRoutingKey;

@Bean
public Publisher publisher() {
return new Publisher();
}

@Bean
public SimpleRabbitListenerContainerFactory simpleMessageListenerContainerFactory(ConnectionFactory connectionFactory,
SimpleRabbitListenerContainerFactoryConfigurer configurer) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();

configurer.configure(factory, connectionFactory);
return factory;
}

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);
template.setMessageConverter(jsonMessageConverter());
return template;
}

@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}

@Bean
public Queue replyQueueRPC() {
return new Queue(replyQueue);
}

@Bean
public SimpleMessageListenerContainer rpcReplyMessageListenerContainer(ConnectionFactory connectionFactory) {
SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
simpleMessageListenerContainer.setQueues(replyQueueRPC());
simpleMessageListenerContainer.setReceiveTimeout(2000);
simpleMessageListenerContainer.setTaskExecutor(Executors.newCachedThreadPool());
return simpleMessageListenerContainer;
}


@Bean
public AsyncRabbitTemplate asyncRabbitTemplate(ConnectionFactory connectionFactory) {

return new AsyncRabbitTemplate(rabbitTemplate(connectionFactory),
rpcReplyMessageListenerContainer(connectionFactory),
directExchange + "/" + replyRoutingKey);
}

@Bean
public DirectExchange directExchange() {
return new DirectExchange(directExchange);
}

@Bean
public Binding binding() {
return BindingBuilder.bind(replyQueueRPC()).to(directExchange()).with(replyRoutingKey);
}


@Bean
public Subscriber subscriber() {
return new Subscriber();
}

}

Publisher.java

public class Publisher {

@Value("${routingKey.request}")
private String requestRoutingKey;
@Autowired
private DirectExchange directExchange;

private static SecureRandom SECURE_RANDOM;

static {
try {
SECURE_RANDOM = SecureRandom.getInstanceStrong();
} catch (NoSuchAlgorithmException e) {
e.printStackTrace();
}
}


@Autowired
private AsyncRabbitTemplate asyncRabbitTemplate;


@Scheduled(fixedDelay = 100 * 1)
public void publishToDirectExchangeRPCStyle() {
Integer integer = SECURE_RANDOM.nextInt();
SampleRequestMessage sampleRequestMessage = new SampleRequestMessage(String.valueOf(integer));
System.out.println("Sending out message on direct directExchange:" + sampleRequestMessage);

AsyncRabbitTemplate.RabbitConverterFuture<SampleResponseMessage> sampleResponseMessageRabbitConverterFuture = asyncRabbitTemplate
.convertSendAndReceive(directExchange.getName(), requestRoutingKey, sampleRequestMessage);
sampleResponseMessageRabbitConverterFuture.addCallback(
sampleResponseMessage ->
System.out.println("Response for request message:" + sampleRequestMessage + " is:" + sampleResponseMessage)
, failure ->
System.out.println(failure.getMessage())
);

}
}

订阅者.java

public class Subscriber {

@RabbitHandler
@RabbitListener(
bindings = {
@QueueBinding(value = @Queue("${queue.request}"),
key = "${routingKey.request}",
exchange = @Exchange(value = "${exchange.direct}", type = ExchangeTypes.DIRECT, durable = "true"))})
public SampleResponseMessage subscribeToRequestQueue(@Payload SampleRequestMessage sampleRequestMessage, Message message) {
System.out.println("Received message :" + message);
return new SampleResponseMessage(sampleRequestMessage.getMessage());
}
}

最佳答案

你的解决方案很好。

不清楚你想问什么...

I had to declare directExchange() and binding() beans in configuration. Is there any way I can avoid it, because I feel it is code duplication as I am declaring these beans twice.

@QueueBinding 只是 @RabbitListener 的一种便利,也是将队列、交换和绑定(bind)声明为 @Bean 的替代方法。

如果您使用通用的@Config类,您可以简单地省略监听器上的bindings属性并使用queues = "${queue.reply} "以避免重复。

I would need to declare similar rpcReplyMessageListenerContainer() and asyncRabbitTemplate() for each request-reply call. Is that correct?

是的;尽管在即将发布的 2.0 版本中,您可以使用 DirectReplyToMessageListenerContainer ,这样就无需为每个服务使用单独的回复队列;当您发送消息时。

See the documentation herehere .

Starting with version 2.0, the async template now supports Direct reply-to instead of a configured reply queue.

(应理解为“作为”的替代品,而不是“代替”)。

因此您可以使用相同的模板与多个服务通信。

关于java - 使用 Spring Boot RabbitMQ 的异步 RPC,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45913418/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com