gpt4 book ai didi

java - 如何通过 Aggregator 的 Java 配置使用组超时?

转载 作者:行者123 更新时间:2023-12-02 10:56:57 26 4
gpt4 key购买 nike

我想使用 SI 中的聚合器端点根据消息主题聚合 MQTT 消息,并在收到所有部分(一些陀螺仪值:X、Y 和 Z)时释放聚合消息,到目前为止没有问题。 。 有用。但我想添加一个组超时,这样当我在一段时间内没有收到所有 3 个值时,消息将被丢弃,我可以等待新消息。

我的工作代码:

配置:

@SpringBootApplication
public class MqttListenerApplication {

public static void main(String[] args) {
SpringApplication.run(MqttListenerApplication.class, args);
}

@Bean
public MessageChannel mqttInputChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel filterOutputChannel() {
return new DirectChannel();
}

@Bean
public MessageChannel aggregatorOutputChannel() {
return new DirectChannel();
}

@Bean
public MqttPahoClientFactory clientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
MqttConnectOptions options = new MqttConnectOptions();
options.setUserName("demo:application");
options.setPassword("PwdApps".toCharArray());
factory.setConnectionOptions(options);
return factory;
}

@Bean
public MessageProducer inbound() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://mqtt2.thingsplay.com:1883", "test-007", clientFactory(),"#");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
adapter.setOutputChannel(mqttInputChannel());
return adapter;
}
}

过滤端点:

@MessageEndpoint
public class MqttFilter {

@Filter(
inputChannel = "mqttInputChannel",
outputChannel = "filterOutputChannel"
)
public boolean isValid(@Header("mqtt_receivedTopic") String topic, Message<?> message) {
if (topic.contains("testbw")) {
System.out.println("------ Valid Message ! ------");
return true;
} else {
return false;
}
}

}

聚合器端点:

@MessageEndpoint
public class GyroAggregator {

private static final Logger logger = LogManager.getLogger();

@Aggregator(
inputChannel = "filterOutputChannel",
outputChannel = "aggregatorOutputChannel"
)
public GyroCompleted aggregate(List<Message<?>> messages) {
GyroCompleted gyroCompleted = new GyroCompleted();
for (Message<?> message : messages) {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
if (topic.contains("ACCX")) {
gyroCompleted.setAcc_x(Integer.valueOf((String) message.getPayload()));
} else if (topic.contains("ACCY")) {
gyroCompleted.setAcc_y(Integer.valueOf((String) message.getPayload()));
} else if (topic.contains("ACCZ")) {
gyroCompleted.setAcc_z(Integer.valueOf((String) message.getPayload()));
}
}
return gyroCompleted;
}

@ReleaseStrategy
public boolean hasAllAxes(List<Message<?>> messages) {
logger.debug("In Release Strategy method.");
logger.debug(messages);
boolean x = false, y = false, z = false;
for (Message<?> message : messages) {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
if (topic.contains("ACCX")) {
x = true;
} else if (topic.contains("ACCY")) {
y = true;
} else if (topic.contains("ACCZ")) {
z = true;
}
}
logger.debug("Release Strategy method returning {}", x && y && z);
return x && y && z;
}

@CorrelationStrategy
public String correlateBy(@Header("mqtt_receivedTopic") String topic, Message<?> message) {
logger.debug("In Correlation Strategy method.");
String deviceId = topic.substring(0, topic.indexOf("/"));
logger.debug("Correlation Strategy returning Key : {}", deviceId);
return deviceId;
}

}

回显端点:

@MessageEndpoint
public class EchoServiceActivator {

private static final Logger logger = LogManager.getLogger();

@ServiceActivator(
inputChannel = "aggregatorOutputChannel"
)
public void echo(Message<?> message) {
logger.debug("Echo : " + message);
}

}

但是对于组超时点,我无法使其工作...尽管文档这么说,但没有通过注释进行配置:

All of the configuration options provided by the xml element are also available for the @Aggregator annotation.

但是下面的几行是这样说的:

Annotation configuration (@Aggregator and others) for the Aggregator component covers only simple use cases, where most default options are sufficient. If you need more control over those options using Annotation configuration, consider using a @Bean definition for the AggregatingMessageHandler and mark its @Bean method with @ServiceActivator

问题是我无法让 @Bean 工作...

AggregatingMessageHandler

我试图将它放在一个用@MessageEndpoint注释的类中,但它也不起作用。我认为它会 Autowiring 聚合器的所有组件。

我怎样才能让它发挥作用?

最佳答案

使用 Java DSL 更容易。像这样的东西:

@Bean
public IntegrationFlow aggregatorFlow(GyroAggregator agg) {
return IntegrationFlows.from("filterOutputChannel")
.aggregate(a -> a
.processor(agg)
.groupTimeout(500L))
.channel("aggregatorOutputChannel")
.get();
}

当然,您可以将 MQTT 适配器和过滤器连接到同一流中。

如果要将处理程序定义为 @Bean,请在构造函数中使用 new SimpleMessageStore()

关于java - 如何通过 Aggregator 的 Java 配置使用组超时?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51654331/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com