- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我目前正在尝试通过消息代理 RabbitMQ 进行通信时可能发生的故障场景。目标是评估如何使此类沟通更具弹性。
特别是,我想在producer-commit mode中发送消息时触发nack(不确认)确认。为此,我通过 Spring AMQP 的 RabbitTemplate.send
向不存在的交换发送一条消息。在通过 RabbitTemplate.setConfirmCallback 提供的回调中,我然后通过将消息重新发送到现有交换来处理 ack=false 确认(模拟我处理了 nack 原因)。
下面提供了一个示例类和相关测试,完整的示例项目可以在my github repository中找到。我使用 RabbitMQ 3.6 和 Spring Boot/AMQP 2.0.2。
运行测试时,会按预期使用 ack=false
调用回调。但是,重新发送消息在重新创建 channel 时会挂起(10 分钟后出现超时异常)。下面提供了调用堆栈和日志的转储。
问题的解决方案似乎是按照建议的 here 在不同的线程中发送消息。 。如果您在测试中取消注释 service.runInSeparateThread = true;
行,则一切正常!
但是,除了上述帖子之外,我既不真正理解为什么事情不起作用,也没有在任何地方读到过有关这种做法的信息。这是预期的行为还是错误?谁能详细解释一下吗?
非常感谢您的建议!
调用堆栈快照:
"AMQP Connection 127.0.0.1:5672@3968" prio=5 tid=0xe nid=NA waiting
java.lang.Thread.State: WAITING
at java.lang.Object.wait(Object.java:-1)
at com.rabbitmq.utility.BlockingCell.get(BlockingCell.java:73)
at com.rabbitmq.utility.BlockingCell.uninterruptibleGet(BlockingCell.java:120)
at com.rabbitmq.utility.BlockingValueOrException.uninterruptibleGetValue(BlockingValueOrException.java:36)
at com.rabbitmq.client.impl.AMQChannel$BlockingRpcContinuation.getReply(AMQChannel.java:494)
at com.rabbitmq.client.impl.AMQChannel.privateRpc(AMQChannel.java:288)
at com.rabbitmq.client.impl.AMQChannel.exnWrappingRpc(AMQChannel.java:138)
at com.rabbitmq.client.impl.ChannelN.open(ChannelN.java:133)
at com.rabbitmq.client.impl.ChannelManager.createChannel(ChannelManager.java:176)
at com.rabbitmq.client.impl.AMQConnection.createChannel(AMQConnection.java:542)
at org.springframework.amqp.rabbit.connection.SimpleConnection.createChannel(SimpleConnection.java:57)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createBareChannel(CachingConnectionFactory.java:1156)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.access$200(CachingConnectionFactory.java:1144)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.doCreateBareChannel(CachingConnectionFactory.java:585)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.createBareChannel(CachingConnectionFactory.java:568)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getCachedChannelProxy(CachingConnectionFactory.java:538)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.getChannel(CachingConnectionFactory.java:520)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory.access$1500(CachingConnectionFactory.java:94)
at org.springframework.amqp.rabbit.connection.CachingConnectionFactory$ChannelCachingConnectionProxy.createChannel(CachingConnectionFactory.java:1161)
at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1803)
at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1771)
at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:859)
...
日志:
...
10:21:24.613 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitAdmin - declaring Exchange 'ExistentExchange'
10:21:24.630 [main] INFO com.example.rabbitmq.ProducerService - sending `initial Message`
10:21:24.648 [main] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - Added listener org.springframework.amqp.rabbit.core.RabbitTemplate$MockitoMock$952329793@562c877a
10:21:24.648 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Added publisher confirm channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@3013909b Shared Rabbit Connection: SimpleConnection@12db3386 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 1341] to map, size now 1
10:21:24.649 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Executing callback RabbitTemplate$$Lambda$175/1694519286 on RabbitMQ Channel: Cached Rabbit Channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1), conn: Proxy@3013909b Shared Rabbit Connection: SimpleConnection@12db3386 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 1341]
10:21:24.649 [main] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Publishing message (Body:'[B@67001148(byte[15])' MessageProperties [headers={}, contentType=application/octet-stream, contentLength=0, deliveryMode=PERSISTENT, priority=0, deliveryTag=0])on exchange [nonExistentExchange], routingKey = [nonExistentQueue]
10:21:24.659 [main] INFO com.example.rabbitmq.ProducerService - done with sending message
10:21:24.675 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) PC:Nack:(close):1
10:21:24.677 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - Sending confirm PendingConfirm [correlationData=null cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'nonExistentExchange' in vhost '/', class-id=60, method-id=40)]
10:21:24.677 [AMQP Connection 127.0.0.1:5672] INFO com.example.rabbitmq.ProducerService - In confirm callback, ack=false, cause=channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'nonExistentExchange' in vhost '/', class-id=60, method-id=40), correlationData=null
10:21:24.677 [AMQP Connection 127.0.0.1:5672] INFO com.example.rabbitmq.ProducerService - sending `resend Message`
10:21:24.678 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) PC:Nack:(close):1
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - AMQChannel(amqp://guest@127.0.0.1:5672/,1) No listener for seq:1
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Removed publisher confirm channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) from map, size now 0
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.core.RabbitTemplate - Removed publisher confirm channel: PublisherCallbackChannelImpl: AMQChannel(amqp://guest@127.0.0.1:5672/,1) from map, size now 0
10:21:24.679 [AMQP Connection 127.0.0.1:5672] DEBUG org.springframework.amqp.rabbit.support.PublisherCallbackChannelImpl - PendingConfirms cleared
ProducerService:
@Service
public class ProducerService {
static final String EXISTENT_EXCHANGE = "ExistentExchange";
private static final String NON_EXISTENT_EXCHANGE = "nonExistentExchange";
private static final String QUEUE_NAME = "nonExistentQueue";
private final Logger logger = LoggerFactory.getLogger(getClass());
private final RabbitTemplate rabbitTemplate;
private final Executor executor = Executors.newCachedThreadPool();
boolean runInSeparateThread = false;
public ProducerService(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setConfirmCallback(this::confirmCallback);
}
private void confirmCallback(CorrelationData correlationData, boolean ack, String cause) {
logger.info("In confirm callback, ack={}, cause={}, correlationData={}", ack, cause, correlationData);
if (!ack) {
if (runInSeparateThread) {
executor.execute(() -> sendMessage("resend Message", EXISTENT_EXCHANGE));
} else {
sendMessage("resend Message", EXISTENT_EXCHANGE);
}
} else {
logger.info("sending was acknowledged");
}
}
public void produceMessage() {
sendMessage("initial Message", NON_EXISTENT_EXCHANGE);
}
private void sendMessage(String messageBody, String exchangeName) {
logger.info("sending `{}`", messageBody);
rabbitTemplate.send(exchangeName, QUEUE_NAME, new Message(messageBody.getBytes(), new MessageProperties()));
logger.info("done with sending message");
}
}
ProducerServiceTest:
@RunWith(SpringRunner.class)
@ContextConfiguration(classes = {RabbitAutoConfiguration.class, ProducerService.class})
@DirtiesContext
public class ProducerServiceTest {
@Autowired
private ProducerService service;
@SpyBean
private RabbitTemplate rabbitTemplate;
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private CachingConnectionFactory cachingConnectionFactory;
@Before
public void setup() {
cachingConnectionFactory.setPublisherConfirms(true);
amqpAdmin.declareExchange(new DirectExchange(ProducerService.EXISTENT_EXCHANGE));
}
@After
public void cleanup() {
amqpAdmin.deleteExchange(ProducerService.EXISTENT_EXCHANGE);
}
@Test
public void sendMessageToNonexistentExchange() throws InterruptedException {
final CountDownLatch sentMessagesLatch = new CountDownLatch(2);
final List<Message> sentMessages = new ArrayList<>();
doAnswer(invocation -> {
invocation.callRealMethod();
sentMessages.add(invocation.getArgument(2));
sentMessagesLatch.countDown();
return null;
}).when(rabbitTemplate).send(anyString(), anyString(), any(Message.class));
// service.runInSeparateThread = true;
service.produceMessage();
sentMessagesLatch.await();
List<String> messageBodies = sentMessages.stream().map(message -> new String(message.getBody())).collect(toList());
assertThat(messageBodies, equalTo(Arrays.asList("initial Message", "resend Message")));
}
}
最佳答案
我认为这可以被视为一个错误,但它是我们缓存 channel 以提高性能的方式的产物。问题在于,尝试在为同一 channel 传递 ack 的同一线程上的 channel 上发布会导致客户端库中出现死锁。
我们有一个open issue研究解决方案(出于不同的原因);我们只是还没有抽出时间来做这件事。 AFAIK, self 们添加对确认和返回的支持以来,您是 6 年多以来第二位实现此目标的用户。
编辑
其实,这是一种不同的情况;由于 channel 已关闭,因此它不会重用该 channel 。它试图创建一个新的 channel ,但这就是僵局。我不明白我们(Spring AMQP)能做什么;这是java客户端的限制;您不能在ack线程上执行操作。
关于由于不存在交换,Java RabbitMQ 客户端在 nack 后通过生产者提交回调线程挂起重新发送,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50580507/
我有一个应用程序,当通过 eclipse 运行时,它会导致 eclipse 本身挂起。如果我将此应用程序导出为 jar 并运行它,它工作正常。但是,如果我运行(或调试)它,应用程序将显示为启动(根据
我正在将项目从 Rails2 切换到 Rails3。我跑: rails server 服务器启动没有错误: => Booting WEBrick => Rails 3.0.7 application
当我尝试使用 XCode 打开特定项目时,它挂起。当它挂起时,它显示以下屏幕: 其他项目可以正常打开,虽然挂起的项目也打开了,意味着我什么也做不了。我的 CPU 全速运行(风扇开始运转),我必须退出多
我正在使用 BNHtmlPdfKit 将 Html 呈现为 PDF。它工作得很好,但在 iOS8 中它只是挂起 [renderer drawPageAtIndex:i inRect:renderer.
我一直在尝试在 eclipse 中创建一个项目,并且有一个名为 InitRemoteEditJob 的工作正在阻止一切。它甚至没有被取消。 请建议怎么办? 最佳答案 这个错误有很多原因。 你可以试试这
我使用这个函数来发出 cURL 请求: function curl_request($options) //single custom cURL request. { $ch = curl_i
当我尝试归档某个项目时,Xcode 无法响应。如果让他一个人呆着,他会在很长一段时间后设法打开管理器。文件在那里。如果从 library/developer/xcode/archives 中手动删除,
有时我的 Eclipse 挂起,我需要猛烈地杀死它。但是,我一直无法正确地做到这一点。似乎 kill -9 实际上并没有以正确的方式关闭它,因为我仍然可以看到挂起的窗口。什么命令序列会正确杀死我的 E
我有一个JavaFX 8应用,它有时会挂起/冻结。我觉得我已经排除了造成此问题的许多原因,但它仍在发生。 不幸的是,我无法按需复制冻结/挂起。实际上,这仅发生在(到目前为止)我同事的计算机上。它可能在
我正在尝试学习网络基础知识,并且已经从this教程构建了回显服务器。我用telnet检查了服务器,它工作正常。 现在,当我使用Internet上的许多客户端示例中的一些示例时: // Create a
我正在尝试使用 SwiftUI 实现使用 Apple 登录,但在我输入正确的密码后它挂起。 我正在使用真实用户和模拟器以及 XCode 12.0 Beta。不幸的是,我现在没有可供测试的设备。我也尝试
我包括此简单的错误处理功能来格式化错误: date_default_timezone_set('America/New_York'); // Create the error handler. fun
我正在尝试为 VisualVM 安装一些插件,但它一直卡在下面的屏幕上 - 告诉我“请等待安装程序发现插件依赖项”。我运行的是 Ubuntu 12.04。当我尝试从“可用插件”列表中安装它们时,以及当
如果堆分配/取消分配/重新分配在另一个线程中进行,DbgHelp 库的 MiniDumpWriteDump() 将挂起。这是调用堆栈:DbgHelp 暂停其他线程,然后无限期地等待这些线程获得的互斥量
我正在尝试在 Eclipse C++ 版本中安装新软件。 帮助 -> 安装新软件。当我去安装新软件时,它会挂起或需要几个小时才能移动百分比。 我读到这是 JRE7 中的一个已知错误,我假设我在安装它后
这个问题已经有答案了: process.waitFor() never returns (12 个回答) 已关闭 3 年前。 我使用以下代码运行命令: open class AppRunner {
我正在尝试为 VisualVM 安装一些插件,但它一直卡在下面的屏幕上 - 告诉我“请等待安装程序发现插件依赖项”。我正在运行 Ubuntu 12.04。当我尝试从“可用插件”列表安装它们时,以及当我
如果堆分配/取消分配/重新分配在另一个线程中进行,DbgHelp 库的 MiniDumpWriteDump() 将挂起。这是调用堆栈:DbgHelp 暂停其他线程,然后无限期地等待这些线程获得的互斥量
尝试调试竞争条件,其中我们的应用程序的轮询器线程之一永远不会返回,导致 future 的轮询器永远不会被调度。用抽象术语来说,在捕获问题时隐藏我们的业务逻辑,这就是我们的代码路径。 我们必须更新远程服
我在程序完成时遇到 Java 的 ExecutorCompletionService 问题。 我需要使用 ExecutorCompletionService 而不是 ExecutorService 因
我是一名优秀的程序员,十分优秀!