gpt4 book ai didi

java - 使用 Spring AMQP 接收和发送 Java 对象

转载 作者:行者123 更新时间:2023-12-01 19:52:09 26 4
gpt4 key购买 nike

我想实现 Spring AMQP 示例,以使用监听器发送和接收 Java 对象。我试过这个:

发送Java对象

ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false)).to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));
AmqpTemplate template = new RabbitTemplate(connectionFactory);

TransactionsBean obj = new TransactionsBean();
obj.setId(Long.valueOf(111222333));

接收并发送回另一个 Java 对象:

ConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
AmqpAdmin admin = new RabbitAdmin(connectionFactory);
admin.declareBinding(BindingBuilder.bind(new Queue(QUEUE_PROCESSING_TRANSACTION, false))
.to(new TopicExchange(EXCHANGE_PROCESSING)).with(ROUTING_KEY_PROCESSING_TRANSACTION));
AmqpTemplate template = new RabbitTemplate(connectionFactory);

TransactionsBean obj = (TransactionsBean) template.receiveAndConvert(QUEUE_PROCESSING_TRANSACTION);
System.out.println(" !!!!!!! Received id " + obj.getTransaction_id());

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueues(new Queue(QUEUE_PROCESSING_TRANSACTION, false));

container.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
// Receive here Java object and send back another object
}
});

你能告诉我如何在没有复杂注释的情况下仅使用简单的监听器来扩展代码吗?

最佳答案

最简单的方法是使用 @RabbitListener - 在使用 Spring Boot 时变得更加容易,因为他将连接基础设施 bean(模板、管理等)。

@SpringBootApplication
public class So51009346Application {

public static final String QUEUE_PROCESSING_TRANSACTION = "q1";

public static void main(String[] args) {
SpringApplication.run(So51009346Application.class, args);
}

@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
System.out.println(reply);
};
}

@Bean
public Queue queue() {
return new Queue(QUEUE_PROCESSING_TRANSACTION);
}

@Bean
public TopicExchange te() {
return new TopicExchange("ex");
}

@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(te()).with("rk");
}

}

class RequestObject implements Serializable {

private static final long serialVersionUID = 1L;

}

class ReplyObject implements Serializable {

private static final long serialVersionUID = 1L;

}

@Component
class Listener {

@RabbitListener(queues = So51009346Application.QUEUE_PROCESSING_TRANSACTION)
public ReplyObject process(RequestObject ro) {
return new ReplyObject();
}

}

如果您出于某种原因不想使用该注释,您可以使用 MessageListenerAdapter 连接容器...

@SpringBootApplication
public class So51009346Application {

public static final String QUEUE_PROCESSING_TRANSACTION = "q1";

public static void main(String[] args) {
SpringApplication.run(So51009346Application.class, args);
}

@Bean
public ApplicationRunner runner(RabbitTemplate template) {
return args -> {
ReplyObject reply = (ReplyObject) template.convertSendAndReceive("ex", "rk", new RequestObject());
System.out.println(reply);
};
}

@Bean
public SimpleMessageListenerContainer container(ConnectionFactory cf, Listener listener) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(cf);
container.setQueueNames(QUEUE_PROCESSING_TRANSACTION);
container.setMessageListener(new MessageListenerAdapter(listener, "process"));
return container;
}

@Bean
public Queue queue() {
return new Queue(QUEUE_PROCESSING_TRANSACTION);
}

@Bean
public TopicExchange te() {
return new TopicExchange("ex");
}

@Bean
public Binding binding() {
return BindingBuilder.bind(queue()).to(te()).with("rk");
}

}

class RequestObject implements Serializable {

private static final long serialVersionUID = 1L;

}

class ReplyObject implements Serializable {

private static final long serialVersionUID = 1L;

}

@Component
class Listener {

public ReplyObject process(RequestObject ro) {
return new ReplyObject();
}

}

当然,您可以自己连接容器,就像您的问题一样,使用适配器,但通常最好让 Spring 将其作为 @Bean 进行管理,否则您将错过一些功能(例如,针对失败、空闲容器的事件发布)。适配器获取对您的请求/回复监听器的引用以及要调用的方法名称。

关于java - 使用 Spring AMQP 接收和发送 Java 对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51009346/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com