gpt4 book ai didi

java - 使用 Spring Integration MQTT 通过相同连接发布和订阅

转载 作者:塔克拉玛干 更新时间:2023-11-02 20:23:45 25 4
gpt4 key购买 nike

由于 MQTT 的设计,您只能与唯一的客户端 id 建立连接,是否可以使用相同的连接在 Spring Framework/Boot 中使用 Integration 进行发布和订阅?

以这个非常简单的例子为例,它会连接到 MQTT broker 来订阅和获取消息,但是如果你想发布消息,第一次连接会断开并在消息发送后重新连接。

@Bean
public MqttPahoClientFactory mqttClientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://localhost:1883");
factory.setUserName("guest");
factory.setPassword("guest");
return factory;
}

// publisher

@Bean
public IntegrationFlow mqttOutFlow() {
return IntegrationFlows.from(CharacterStreamReadingMessageSource.stdin(),
e -> e.poller(Pollers.fixedDelay(1000)))
.transform(p -> p + " sent to MQTT")
.handle(mqttOutbound())
.get();
}

@Bean
public MessageHandler mqttOutbound() {
MqttPahoMessageHandler messageHandler = new MqttPahoMessageHandler("siSamplePublisher", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("siSampleTopic");
return messageHandler;
}

// consumer

@Bean
public IntegrationFlow mqttInFlow() {
return IntegrationFlows.from(mqttInbound())
.transform(p -> p + ", received from MQTT")
.handle(logger())
.get();
}

private LoggingHandler logger() {
LoggingHandler loggingHandler = new LoggingHandler("INFO");
loggingHandler.setLoggerName("siSample");
return loggingHandler;
}

@Bean
public MessageProducerSupport mqttInbound() {
MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
mqttClientFactory(), "siSampleTopic");
adapter.setCompletionTimeout(5000);
adapter.setConverter(new DefaultPahoMessageConverter());
adapter.setQos(1);
return adapter;
}

如果您需要在发布消息后等待答案/结果,那么使用 2 个单独的连接会变得困难...

最佳答案

the first connection will disconnect and re-connect after the message is sent.

不知道你的意思是什么;两个组件都将保持打开持久连接。

由于工厂不连接客户端,而适配器连接,因此它不是为使用共享客户端而设计的。

使用单个连接并不能真正帮助协调请求/回复,因为回复仍会在另一个线程上异步返回。

如果您在请求/回复中有一些数据可用于关联对请求的回复,则可以使用 BarrierMessageHandler 来执行该任务。参见 my answer here举个例子;它使用标准相关 ID header ,但 MQTT 无法做到这一点,您需要在消息中包含一些内容。

关于java - 使用 Spring Integration MQTT 通过相同连接发布和订阅,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48443425/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com