- Java 双重比较
- java - 比较器与 Apache BeanComparator
- Objective-C 完成 block 导致额外的方法调用?
- database - RESTful URI 是否应该公开数据库主键?
我正在使用以下 XML 片段:
<int-amqp:inbound-channel-adapter acknowledge-mode="MANUAL" channel="commandQueue" concurrent-consumers="${commandConsumers:10}"
queue-names="commands" connection-factory="connectionFactory"/>
<int:channel id="commandQueue"/>
<int:channel id="commands"/>
<int:chain input-channel="commandQueue" output-channel="commands">
<int:delayer id="commandDelayer" default-delay="30000"/>
<int:json-to-object-transformer id="commandTransformer" type="com.airwatch.chat.command.Command"/>
</int:chain>
<int:payload-type-router input-channel="commands">
....
....
它正在执行这些任务:
如果在使用上述代码的应用程序启动之前消息已经存在于命令队列中,则应用程序在启动时会在单独的线程中执行消息两次。
我想我知道为什么会这样。
一旦应用程序上下文完全初始化,Spring 就会重新安排在 DelayHandler 的消息存储中持久保存的消息。请引用以下来自 DelayHandler.java
的代码片段:
public void onApplicationEvent(ContextRefreshedEvent event) {
if (!this.initialized.getAndSet(true)) {
this.reschedulePersistedMessages();
}
}
因此,如果消息在应用程序启动之前已经存在于 RabbitMQ 队列中,则在 Spring 上下文初始化期间,消息将从队列中拾取并添加到 DelayHandler 的消息存储中。上下文初始化完成后,如果在此期间消息未从消息存储中释放,则上述代码片段会重新安排同一消息。
现在,当两个独立的线程正在执行同一条消息时,如果一个线程已执行,则应从消息存储中删除该消息,而另一个线程不应继续执行。
当线程执行时,DelayHandler.java
中的下面一段代码允许第二个线程释放重复的消息,导致对同一条消息重复执行,因为消息存储是一个实例SimpleMessageStore 并且没有进一步的检查来停止执行。
private void doReleaseMessage(Message<?> message) {
if (this.messageStore instanceof SimpleMessageStore
|| ((MessageStore) this.messageStore).removeMessage(message.getHeaders().getId()) != null) {
this.messageStore.removeMessageFromGroup(this.messageGroupId, message);
this.handleMessageInternal(message);
}
else {
if (logger.isDebugEnabled()) {
logger.debug("No message in the Message Store to release: " + message +
". Likely another instance has already released it.");
}
}
}
这是 Spring Integration 中的错误吗?
最佳答案
哦,好吧!
那真是个可爱的 bug 。
感谢您指出这一点!
请提出 JIRA issue我们将在下一个版本中解决这个问题。
我可以解释发生了什么。
所有 Spring Integration 从 Lifecycle.start()
开始工作.在你的情况下 <int-amqp:inbound-channel-adapter>
从 RabbitMQ 接收消息并将其发送到集成流。他们是delayed
.
并且仅在 start
之后应用程序上下文引发 ContextRefreshedEvent
.捕获那个甚至DelayHandler
从 messageStore
中获取所有消息并且,正如您所说,reschedules
他们。
因此,是的,对于同一条消息,我们可能有两个计划任务。
有趣的是它只适用于SimpleMessageStore
,因为它没有 removeMessage
存储到 groups
的消息的函数.
我看到了几种变体作为解决方法:
延迟 start
对于 <int-amqp:inbound-channel-adapter>
.例如,处理相同的 ContextRefreshedEvent
来自 <inbound-channel-adapter>
并发送@amqpAdapter.start()
命令消息到 <control-bus>
另一个选项自 Spring Integration 4.1 起可用,其名称为 Idempotent Receiver
.使用它你可以丢弃 duplicate
消息,我猜 idempotentKey
正是messageId
.干净的幂等接收器模式!
还有一个选项在persistent
下MessageStore
,我们真正可以依靠的地方 removeMessage
操作。
有关此事的 JIRA 票证:https://jira.spring.io/browse/INT-3560
关于java - Spring 集成 : Message released twice after delay,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/26895123/
我认为我的问题与“https://serverfault.com/q/299179”和“https://serverfault.com/q/283330/71790”有些相关,但其中任何一个都没有令我
我生成了 APK 对于我的 flutter 项目和 F:\build\app\outputs\apk\release 我有 3 种类型的 apk 文件,包括 output.json 文件。他们是: *
我们最近决定更新 Beta release 的新应用程序在 Google Play 上, 现在读完指南后,我心里有一些问题,想了解更多,我用谷歌搜索进一步了解找到了一些答案,但还有一些我不确定的东西,
我正在尝试使用发布管理作为构建版本的工具,但我很难理解码件、工具和操作之间的真正区别。有人可以分解这三个概念之间的差异以及它们如何相互配合吗? 最佳答案 由于它适用于基于代理的版本: 工具旨在提供自定
我最近完成了使用 jgitflow:release-finish 合并一个发布分支来掌握和开发。 .构建成功。 但是现在我正在尝试使用 jgitflow:releast-start 创建一个新分支.但
我一直在读到,如果一个集合“被释放”,它也会释放它的所有对象。另一方面,我还读到,一旦集合被释放,集合就会释放它的对象。 但最后一件事可能并不总是发生,正如苹果所说。系统决定是否取消分配。在大多数情况
我在具有以下布局的多模块项目上使用 maven-release-plugin: ROOT/ + parent + module1 + module2 在parent的pom中,使用modu
我正在使用 ionic 构建移动应用。 我面临一个严重的问题。 我必须使用 on-touch 和 on-release 事件,但问题是每当我触摸时,on-release 甚至也会立即触发而没有实际释放
谁能解释清楚两者之间的区别是什么.Release()和->Release() 在 CComPtr 上? 确切地说,两种情况下内存管理是如何发生的? 最佳答案 CComPtr 的operator-> 函
两个片段有什么区别? [myObj release]; 和 [myObj release]; myObj = nil; 最佳答案 如果你只是释放一个对象,那么它就会变成释放对象。 如果您尝试对已释放的
我正在运行 maven 发布插件 (org.apache.maven.plugins:maven-release-plugin:2.3.2) 并注意到当通过命令行。我想知道是否有办法关闭它。 我使用
我正在尝试通过运行nuget pack -properties Configuration=Release命令来更新我的nuget软件包,但这会给我以下错误: Unable to find 'bin/
我们正在使用 Microsoft 的发布管理将我们的 Web 应用程序部署到我们的测试环境 (QA)。它是一个直接的 MVC.Net Web 应用程序。我们的构建生成一个 web 部署包,我们有一个命
我有一个在 X 环境中发布的版本 A。另一方面,我有一个在环境 Y 中发布的版本 B。 问题是我想知道我是否可以在版本 B 中检查版本 A 的状态,这样我就可以抛出错误而不发布版本 B。 我不知道是否
我正在开发一个使用大量图像的应用程序,我正在使用 UIWebView 使用 JavaScript 代码(我正在使用 UIZE 库)来表示大约 200 张图像,问题当我完成 UIWebView 时,我在
我已阅读 Marshal.GetIUnknownForObject 的文档它说: Always use Marshal.Release to decrement the reference count
为了成为 iPhone SDK 上的好内存公民,我一直在玩内存。 然而,我仍然很难理解"self.something" 和只是"something" 之间的区别。 据我了解,"self.somethi
我需要使用 bash 找出我正在运行的 Linux 发行版。找到this page ,这非常有帮助。 但是我的系统有两个/etc/*-release 文件 /etc/lsb-release /etc/
我想使用 Maven Release Plugin 将 Release Candidates 发布到我的 Nexus Snapshot 存储库。 将 RC 部署到 Nexus 不是问题,但我想利用 m
在什么情况下我们应该使用“Latch until release”而不是“Switch until release”? 根据 LabVIEW 2011 Help : Latch until relea
我是一名优秀的程序员,十分优秀!