gpt4 book ai didi

java - Spring MVC + Mosquitto + MQTT 集成无法获取任何消息

转载 作者:行者123 更新时间:2023-11-30 08:12:22 27 4
gpt4 key购买 nike

使用 Spring 的集成库,我正在尝试连接到 mosquitto 并读取/发送消息...但是有些事情我无法弄清楚。

1 - 启动应用程序时,应用程序连接到 mosquitto,但 mosquitto 在几秒钟内再次收到来自具有相同 ID 的同一应用程序的数百个连接请求。这是日志的例子:

New connection from 127.0.0.1 on port 1555.
Client springClient already connected, closing old connection.
Client springClient disconnected.
New client connected from 127.0.0.1 as springClient (c1, k60).
Sending CONNACK to springClient (0, 0)
Received SUBSCRIBE from springClient
0001/001/INF (QoS 1)
springClient 1 0001/001/INF
Sending SUBACK to springClient
New connection from 127.0.0.1 on port 1555.
Client springClient already connected, closing old connection.
Client springClient disconnected.

2 - 使用此配置我无法从 mosquitto 获取任何消息:

Spring 的XML:

<!-- This is for reading messages -->
<bean id="mqttInbound" class="com.mobistech.drc.m2mproject.mqtt.MqttCustomInboundAdapter">
<beans:constructor-arg name="clientId" value="springClient" />
<beans:constructor-arg name="clientFactory" ref="clientFactory" />
<beans:constructor-arg name="topic" value="0001/001/INF" />
<beans:property name="autoStartup" value="true"></beans:property>
<beans:property name="outputChannel" ref="fromBrokerChannel"></beans:property>
</bean>

<int:channel id="fromBrokerChannel" />

自定义适配器:

public class MqttCustomInboundAdapter extends MqttPahoMessageDrivenChannelAdapter {

public MqttCustomInboundAdapter(String clientId,
MqttPahoClientFactory clientFactory, String[] topic) {
super(clientId, clientFactory, topic);
// TODO Auto-generated constructor stub
}

@Override
public void messageArrived(String topic, MqttMessage message) throws Exception
{
super.messageArrived(topic, message);
System.out.println("**************** Message from topic : " + topic);
System.out.println("**************** Message : " + new String(message.getPayload()));
}

public void addTopicIfNotExists(String topic)
{
for(String topicName:getTopic())
{
if(topicName.equals(topic))return;
}

addTopic(topic);

System.out.println("************* Added Topic : " + topic);

for(String topicName:getTopic())
{
System.out.println(topicName);
}
}
}

我没有使用 service-activator,因为我需要知道到达的消息是从哪个主题发送的,所以我包装了 MqttPahoMessageDrivenChannelAdapter,正如它在 Spring Integration Docs 中提到的那样

那么有什么建议吗?

最佳答案

我设法用 java config 配置了 mqtt

@Bean
public MqttPahoMessageDrivenChannelAdapter mqttInbound() {

MqttPahoMessageDrivenChannelAdapter mqtt = new MqttPahoMessageDrivenChannelAdapter( applicationName + "-sub", clientFactory( ), "/#" );
mqtt.setQos( 2 );
mqtt.setOutputChannel( outbount( ) );
mqtt.setAutoStartup( true );
mqtt.setTaskScheduler( taskScheduler( ) );

return mqtt;
}

@Bean
public MqttPahoMessageHandler mqqtMessageHandler() {

return new MqttPahoMessageHandler( applicationName + "-pub", clientFactory( ) );
}

@Bean
public DefaultMqttPahoClientFactory clientFactory() {

DefaultMqttPahoClientFactory clientFactory = new DefaultMqttPahoClientFactory( );
clientFactory.setUserName( "test" );
clientFactory.setPassword( "test" );
clientFactory.setServerURIs( new String[] { "tcp://url:1883" } );
return clientFactory;
}

@Bean
public PublishSubscribeChannel outbount() {

PublishSubscribeChannel psc = new PublishSubscribeChannel( );
psc.subscribe( new MessageHandler( ) {

@Override
public void handleMessage( Message<?> message ) throws MessagingException {

logger.warn( message );

}
} );

return psc;
}

要发送消息,请添加以下内容:

@Autowired
MqttPahoMessageHandler mqtt;

@RequestMapping( "/" )
public ModelAndView getHomePage() throws MqttPersistenceException, MqttException {

Message<String> message = MessageBuilder.withPayload( "spring - test" ).setHeader( MqttHeaders.TOPIC, "/topic" ).build( );

mqtt.handleMessage( message );

return new ModelAndView( "home" );
}

关于java - Spring MVC + Mosquitto + MQTT 集成无法获取任何消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30537014/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com