- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我想使用 SI 中的聚合器端点根据消息主题聚合 MQTT 消息,并在收到所有部分(一些陀螺仪值:X、Y 和 Z)时释放聚合消息,到目前为止没有问题。 。 有用。但我想添加一个组超时,这样当我在一段时间内没有收到所有 3 个值时,消息将被丢弃,我可以等待新消息。
我的工作代码:
配置:
@SpringBootApplication
public class MqttListenerApplication {
public static void main(String[] args) {
SpringApplication.run(MqttListenerApplication.class, args);
}
@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel filterOutputChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel aggregatorOutputChannel() {
return new DirectChannel();
}
@Bean
public MqttPahoClientFactory clientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("demo:application");
options.setPassword("PwdApps".toCharArray());
factory.setConnectionOptions(options);
return factory;
}
@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://mqtt2.thingsplay.com:1883", "test-007", clientFactory(),"#");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
}
过滤端点:
@MessageEndpoint
public class MqttFilter {
@Filter(
inputChannel = "mqttInputChannel",
outputChannel = "filterOutputChannel"
)
public boolean isValid(@Header("mqtt_receivedTopic") String topic, Message<?> message) {
if (topic.contains("testbw")) {
System.out.println("------ Valid Message ! ------");
return true;
} else {
return false;
}
}
}
聚合器端点:
@MessageEndpoint
public class GyroAggregator {
private static final Logger logger = LogManager.getLogger();
@Aggregator(
inputChannel = "filterOutputChannel",
outputChannel = "aggregatorOutputChannel"
)
public GyroCompleted aggregate(List<Message<?>> messages) {
GyroCompleted gyroCompleted = new GyroCompleted();
for (Message<?> message : messages) {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
if (topic.contains("ACCX")) {
gyroCompleted.setAcc_x(Integer.valueOf((String) message.getPayload()));
} else if (topic.contains("ACCY")) {
gyroCompleted.setAcc_y(Integer.valueOf((String) message.getPayload()));
} else if (topic.contains("ACCZ")) {
gyroCompleted.setAcc_z(Integer.valueOf((String) message.getPayload()));
}
}
return gyroCompleted;
}
@ReleaseStrategy
public boolean hasAllAxes(List<Message<?>> messages) {
logger.debug("In Release Strategy method.");
logger.debug(messages);
boolean x = false, y = false, z = false;
for (Message<?> message : messages) {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
if (topic.contains("ACCX")) {
x = true;
} else if (topic.contains("ACCY")) {
y = true;
} else if (topic.contains("ACCZ")) {
z = true;
}
}
logger.debug("Release Strategy method returning {}", x && y && z);
return x && y && z;
}
@CorrelationStrategy
public String correlateBy(@Header("mqtt_receivedTopic") String topic, Message<?> message) {
logger.debug("In Correlation Strategy method.");
String deviceId = topic.substring(0, topic.indexOf("/"));
logger.debug("Correlation Strategy returning Key : {}", deviceId);
return deviceId;
}
}
回显端点:
@MessageEndpoint
public class EchoServiceActivator {
private static final Logger logger = LogManager.getLogger();
@ServiceActivator(
inputChannel = "aggregatorOutputChannel"
)
public void echo(Message<?> message) {
logger.debug("Echo : " + message);
}
}
但是对于组超时点,我无法使其工作...尽管文档这么说,但没有通过注释进行配置:
All of the configuration options provided by the xml element are also available for the @Aggregator annotation.
但是下面的几行是这样说的:
Annotation configuration (@Aggregator and others) for the Aggregator component covers only simple use cases, where most default options are sufficient. If you need more control over those options using Annotation configuration, consider using a @Bean definition for the AggregatingMessageHandler and mark its @Bean method with @ServiceActivator
问题是我无法让 @Bean 工作...
我试图将它放在一个用@MessageEndpoint注释的类中,但它也不起作用。我认为它会 Autowiring 聚合器的所有组件。
我怎样才能让它发挥作用?
最佳答案
使用 Java DSL 更容易。像这样的东西:
@Bean
public IntegrationFlow aggregatorFlow(GyroAggregator agg) {
return IntegrationFlows.from("filterOutputChannel")
.aggregate(a -> a
.processor(agg)
.groupTimeout(500L))
.channel("aggregatorOutputChannel")
.get();
}
当然,您可以将 MQTT 适配器和过滤器连接到同一流中。
如果要将处理程序定义为 @Bean
,请在构造函数中使用 new SimpleMessageStore()
。
关于java - 如何通过 Aggregator 的 Java 配置使用组超时?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51654331/
wait() 和 wait(timeout) 之间有什么区别。无论如何 wait() 需要等待通知调用,但为什么我们有 wait(timeout)? 那么 sleep(timeout) 和 wait(
如何向以下脚本添加超时?我希望它将文本显示为“超时”。 var bustcachevar = 1 //bust potential caching of external pages after in
我正在使用 Firebase once() 方法来检索 React Native 移动应用中的值。问题是,如果手机离线,once() 永远不会返回。文档说 ref.off() 方法应该取消回调,但这似
我在一个表中有一个大型数据集(超过 200 万行,每行超过 100 列),存储在 cassandra 中,几个月前(也许是 2 个月?)我能够执行一个简单的命令来跟踪该表中的记录数量: SELECT
我使用 jquery 开发移动应用程序,下面是我的代码,当我向包含的页面添加 5 或 6 行时,一切正常。但如果我添加多行显示错误消息:Javascript 执行超时。 function succes
我正在使用一个 javascript 确认,它将在 15 分钟后重复调用。如果用户未选择确认框中的任何选项我会在等待 1 分钟后重定向他。如何实现这一目标?我的代码是这样的 var timeo
每次我在沙箱环境中运行这段代码时,我都会超时并最终崩溃。我已经通过多个 IDE 运行它,但仍然找不到任何语法错误。如果有人看到了我没有看到的东西,我将非常感谢您的意见。 //assign variab
更新联系人后我会显示一条消息,1500 毫秒后我会转到另一个页面。我是这样做的: onSubmit() { if (this.form.valid) {
从昨天开始,我拼命尝试使用最新版本的 PHPMailer 运行一个非常简单的电子邮件脚本。 最荒谬的是,同一个脚本在两台服务器上不起作用,但在另一台服务器上却起作用。 这是我的尝试(来自 PHPMai
我已阅读以下 2 篇文章并尝试实现相同的文章。 我的代码是这样的,超时发生在这里 HttpWebRequest wr = (HttpWebRequest)WebRequest.Create(url);
我正在尝试连接到 wsdl 服务, 但收到此错误: wsdl 错误:获取 http://api.didww.com/api/?wsdl - HTTP 错误: header 的套接字读取超时 本地没有问
我在使用 Ansible 的 CentOs7 实例上从 Artifactory 下载 jar 文件时遇到问题。这是我第一次在 Linux 实例上这样做。 我在每个 Windows 实例上都使用了 wi
在过去的两天里,我一直在寻找原因,我在互联网上和堆栈上尝试了很多解决方案。 我有一个带有 ubuntu 16.04 和 apache2 的专用 VM -> 服务器版本:Apache/2.4.18 (U
我正处于构建 PHP 应用程序的早期阶段,其中一部分涉及使用 file_get_contents()从远程服务器获取大文件并将它们传输给用户。例如,要获取的目标文件是 200 mB。 如果下载到服务器
我正在尝试连接到本地网络内的路由器。到目前为止,我已经使用了 TcpClient。 检查我的代码: public static void RouterConnect() {
我正在尝试构建一段代码来搜索使用 Mechanize 和 Ruby 超时的页面。我的测试台包括一个专门写入超时的页面,以及 3 个正常运行的页面。这是代码: urls = ['http://examp
我是 python 的新手,也是语义网查询领域的新手。我正在使用 SPARQLWrapper 库查询 dbpedia,我搜索了库文档但未能找到从 sparqlWrapper 触发到 dbpedia 的
我正在从 GenServer 中的句柄信息功能调用 elixir genserver 以添加电话号码获取表单客户端。但是一旦调用了handle_call,所有者进程就会崩溃[超时]。请帮忙。 全局创建
假设我的 WCF 服务中有以下执行链: ServiceMethod 调用并等待 Method1,然后调用并等待 Method2,后者调用并等待 Method3。最后 ServiceMethod 在返回
目前我正在开发一个从远程服务器发送和接收文件的应用程序。为了进行网络操作,我正在使用 QNetworkAccessManager。 要上传文件,我使用 QNetworkAccessManager::p
我是一名优秀的程序员,十分优秀!