gpt4 book ai didi

由于不存在交换,Java RabbitMQ 客户端在 nack 后通过生产者提交回调线程挂起重新发送

转载 作者:行者123 更新时间:2023-11-30 06:09:30 28 4
gpt4 key购买 nike

我目前正在尝试通过消息代理 RabbitMQ 进行通信时可能发生的故障场景。目标是评估如何使此类沟通更具弹性。

特别是,我想在producer-commit mode中发送消息时触发nack(不确认)确认。为此,我通过 Spring AMQP 的 RabbitTemplate.send 向不存在的交换发送一条消息。在通过 RabbitTemplate.setConfirmCallback 提供的回调中,我然后通过将消息重新发送到现有交换来处理 ack=false 确认(模拟我处理了 nack 原因)。

下面提供了一个示例类和相关测试,完整的示例项目可以在my github repository中找到。我使用 RabbitMQ 3.6 和 Spring Boot/AMQP 2.0.2。

运行测试时,会按预期使用 ack=false 调用回调。但是,重新发送消息在重新创建 channel 时会挂起(10 分钟后出现超时异常)。下面提供了调用堆栈和日志的转储。

问题的解决方案似乎是按照建议的 here 在不同的线程中发送消息。 。如果您在测试中取消注释 service.runInSeparateThread = true; 行,则一切正常!

但是,除了上述帖子之外,我既不真正理解为什么事情不起作用,也没有在任何地方读到过有关这种做法的信息。这是预期的行为还是错误?谁能详细解释一下吗?

非常感谢您的建议!

调用堆栈快照:

 "AMQP Connection 127.0.0.1:5672@3968" prio=5 tid=0xe nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:73)
at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133)
at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:542)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:57)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1156)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1144)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:585)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:568)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:538)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:520)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:859)
...

日志:

...
10:21:24.613 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitAdmin - declaring Exchange 'ExistentExchange'
10:21:24.630 [main] INFO com.example.rabbitmq.ProducerService - sending `initial Message`
10:21:24.648 [main] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate$MockitoMock$952329793@562c877a
10:21:24.648 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@3013909b Shared Rabbit Connection: SimpleConnection@12db3386 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 1341] to map, size now 1
10:21:24.649 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$175/1694519286 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@3013909b Shared Rabbit Connection: SimpleConnection@12db3386 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 1341]
10:21:24.649 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'[B@67001148(byte[15])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [nonExistentExchange], routingKey = [nonExistentQueue]
10:21:24.659 [main] INFO com.example.rabbitmq.ProducerService - done with sending message
10:21:24.675 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) PC:Nack:(close):1
10:21:24.677 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - Sending confirm PendingConfirm [correlationData=null cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'nonExistentExchange' in vhost '/', class-id=60, method-id=40)]
10:21:24.677 [AMQP Connection 127.0.0.1:5672] INFO com.example.rabbitmq.ProducerService - In confirm callback, ack=false, cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'nonExistentExchange' in vhost '/', class-id=60, method-id=40), correlationData=null
10:21:24.677 [AMQP Connection 127.0.0.1:5672] INFO com.example.rabbitmq.ProducerService - sending `resend Message`
10:21:24.678 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) PC:Nack:(close):1
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - AMQChannel(amqp://guest@127.0.0.1:5672/,1) No listener for seq:1
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Removed publisher confirm channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) from map, size now 0
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Removed publisher confirm channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) from map, size now 0
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PendingConfirms cleared

ProducerService:

@Service
public class ProducerService {

static final String EXISTENT_EXCHANGE = "ExistentExchange";
private static final String NON_EXISTENT_EXCHANGE = "nonExistentExchange";
private static final String QUEUE_NAME = "nonExistentQueue";
private final Logger logger = LoggerFactory.getLogger(getClass());
private final RabbitTemplate rabbitTemplate;
private final Executor executor = Executors.newCachedThreadPool();
boolean runInSeparateThread = false;

public ProducerService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this::confirmCallback);
}

private void confirmCallback(CorrelationData correlationData, boolean ack, String cause) {
logger.info("In confirm callback, ack={}, cause={}, correlationData={}", ack, cause, correlationData);
if (!ack) {
if (runInSeparateThread) {
executor.execute(() -> sendMessage("resend Message", EXISTENT_EXCHANGE));
} else {
sendMessage("resend Message", EXISTENT_EXCHANGE);
}
} else {
logger.info("sending was acknowledged");
}
}

public void produceMessage() {
sendMessage("initial Message", NON_EXISTENT_EXCHANGE);
}

private void sendMessage(String messageBody, String exchangeName) {
logger.info("sending `{}`", messageBody);
rabbitTemplate.send(exchangeName, QUEUE_NAME, new Message(messageBody.getBytes(), new MessageProperties()));
logger.info("done with sending message");
}

}

ProducerServiceTest:

@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {RabbitAutoConfiguration.class, ProducerService.class})
@DirtiesContext
public class ProducerServiceTest {

@Autowired
private ProducerService service;
@SpyBean
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private CachingConnectionFactory cachingConnectionFactory;

@Before
public void setup() {
cachingConnectionFactory.setPublisherConfirms(true);
amqpAdmin.declareExchange(new DirectExchange(ProducerService.EXISTENT_EXCHANGE));
}

@After
public void cleanup() {
amqpAdmin.deleteExchange(ProducerService.EXISTENT_EXCHANGE);
}

@Test
public void sendMessageToNonexistentExchange() throws InterruptedException {
final CountDownLatch sentMessagesLatch = new CountDownLatch(2);
final List<Message> sentMessages = new ArrayList<>();
doAnswer(invocation -> {
invocation.callRealMethod();
sentMessages.add(invocation.getArgument(2));
sentMessagesLatch.countDown();
return null;
}).when(rabbitTemplate).send(anyString(), anyString(), any(Message.class));

// service.runInSeparateThread = true;
service.produceMessage();
sentMessagesLatch.await();

List<String> messageBodies = sentMessages.stream().map(message -> new String(message.getBody())).collect(toList());
assertThat(messageBodies, equalTo(Arrays.asList("initial Message", "resend Message")));
}

}

最佳答案

我认为这可以被视为一个错误,但它是我们缓存 channel 以提高性能的方式的产物。问题在于,尝试在为同一 channel 传递 ack 的同一线程上的 channel 上发布会导致客户端库中出现死锁。

我们有一个open issue研究解决方案(出于不同的原因);我们只是还没有抽出时间来做这件事。 AFAIK, self 们添加对确认和返回的支持以来,您是 6 年多以来第二位实现此目标的用户。

编辑

其实,这是一种不同的情况;由于 channel 已关闭,因此它不会重用该 channel 。它试图创建一个新的 channel ,但这就是僵局。我不明白我们(Spring AMQP)能做什么;这是java客户端的限制;您不能在ack线程上执行操作。

关于由于不存在交换,Java RabbitMQ 客户端在 nack 后通过生产者提交回调线程挂起重新发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50580507/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com