gpt4 book ai didi

spring-boot - RabbitMQ在事务中发送消息

转载 作者:行者123 更新时间:2023-12-04 12:46:06 24 4
gpt4 key购买 nike

是否可以在事务中运行下面的代码,以便如果在业务处理中抛出异常,我们可以回滚我们发送到队列的消息?

rabbitTemplate.convertAndSend("queue1", data);

//do some processing

rabbitTemplate.convertAndSend("queue2", data);
如果在向 queue1 发送消息后出现问题,但我们无法向 queue2 发送消息,则需要这样做。或者如果在向队列发送消息时发出网络或其他一些问题怎么办。

最佳答案

如果此代码在监听器容器线程( onMessage()@RabbitListener )上运行并且容器和模板都具有 setChannelTransacted(true)然后发布(和交付)将在同一个事务中运行;抛出异常将导致所有内容回滚。

如果这是在某个任意 java 类中(不在容器线程上运行),那么您需要在方法运行之前启动事务...

    @Transactional
public void send(String in) {
this.template.convertAndSend("foo", in);
if (in.equals("foo")) {
throw new RuntimeException("test");
}
this.template.convertAndSend("bar", in);
}

这是一个完整的 Spring Boot 应用程序,演示了该功能......
@SpringBootApplication
@EnableTransactionManagement
public class So40749877Application {

public static void main(String[] args) {
ConfigurableApplicationContext context = SpringApplication.run(So40749877Application.class, args);
Foo foo = context.getBean(Foo.class);
try {
foo.send("foo");
}
catch (Exception e) {}
foo.send("bar");
RabbitTemplate template = context.getBean(RabbitTemplate.class);
// should not get any foos...
System.out.println(template.receiveAndConvert("foo", 10_000));
System.out.println(template.receiveAndConvert("bar", 10_000));
// should be null
System.out.println(template.receiveAndConvert("foo", 0));
RabbitAdmin admin = context.getBean(RabbitAdmin.class);
admin.deleteQueue("foo");
admin.deleteQueue("bar");
context.close();
}

@Bean
public RabbitTemplate amqpTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}

@Bean
public Queue foo() {
return new Queue("foo");
}

@Bean
public Queue bar() {
return new Queue("bar");
}

@Bean
public Foo fooBean() {
return new Foo();
}

@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new RabbitTransactionManager(connectionFactory);
}

public static class Foo {

@Autowired
private RabbitTemplate template;

@Transactional
public void send(String in) {
this.template.convertAndSend("foo", in);
if (in.equals("foo")) {
throw new RuntimeException("test");
}
this.template.convertAndSend("bar", in);
}

}

}

编辑

消费者方面的交易;这在使用 Spring 时通常不适用,因为它管理事务,但是当直接使用客户端时......
Connection connection = cf.createConnection();
Channel channel = connection.createChannel(true);
channel.basicQos(1);
channel.txSelect();
CountDownLatch latch = new CountDownLatch(1);
channel.basicConsume("foo", new DefaultConsumer(channel) {

@Override
public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,
byte[] body) throws IOException {
System.out.println(new String(body));

getChannel().txRollback(); // delivery won't be requeued; remains unacked

if (envelope.isRedeliver()) {
getChannel().basicAck(envelope.getDeliveryTag(), false);
getChannel().txCommit(); // commit the ack so the message is removed
getChannel().basicCancel(consumerTag);
latch.countDown();
}
else { // first time, let's requeue
getChannel().basicReject(envelope.getDeliveryTag(), true);
getChannel().txCommit(); // commit the reject so the message will be requeued
}
}

});
latch.await();
channel.close();
connection.close();

请注意 txRollback在这种情况下什么都不做;只有确认(或拒绝)是事务性的。

关于spring-boot - RabbitMQ在事务中发送消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40749877/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com