- iOS/Objective-C 元类和类别
- objective-c - -1001 错误,当 NSURLSession 通过 httpproxy 和/etc/hosts
- java - 使用网络类获取 url 地址
- ios - 推送通知中不播放声音
我正在学习如何使用 Java Spring 框架并开始试验 Spring Integration。我正在尝试使用 Spring Integration 将我的应用程序连接到 MQTT 代理以发布和订阅消息,但我无法找到手动将消息发布到出站 channel 的方法。如果可能的话,我想专门使用 java 代码中的符号而不是定义 bean 和其他相关配置的 xml 文件来构建它。
在每个示例中,我都看到手动发布消息的解决方案似乎是使用 MessagingGateway 接口(interface),然后使用 SpringApplicationBuilder 获取 ConfigurableApplicationContext 以在 main 方法中获取对网关接口(interface)的引用。该引用然后用于发布消息。是否可以将 AutoWired 用于接口(interface)?在我的尝试中,我只得到一个 NullPointer。
我的目标是构建一个游戏,在其中我订阅一个主题以获取游戏消息,然后每当用户准备好采取下一步行动时,向该主题发布一条新消息。
更新:这是我一直在研究如何设置出站 channel 的示例之一:https://docs.spring.io/spring-integration/reference/html/mqtt.html
在 Gary Russel 回答后更新 2:
这是我在查看示例后编写的一些示例代码,这些示例在 Controller.java 中运行 gateway.sendToMqtt 时将 @AutoWired 用于网关时让我得到一个 NullPointer。我在这里想要实现的是在 Controller 处理 GET 请求时手动发送一条 mqtt 消息。
应用程序.java
@SpringBootApplication
public class Application {
public static void main(String[] args){
SpringApplication.run(Application.class, args);
}
}
Controller .java
@RestController
@RequestMapping("/publishMessage")
public class Controller {
@Autowired
static Gateway gateway;
@RequestMapping(method = RequestMethod.GET)
public int request(){
gateway.sendToMqtt("Test Message!");
return 0;
}
}
MqttPublisher.java
@EnableIntegration
@Configuration
public class MqttPublisher {
@Bean
public MqttPahoClientFactory mqttClientFactory(){
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setServerURIs("tcp://localhost:1883");
return factory;
}
@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound(){
MqttPahoMessageHandler messageHandler =
new MqttPahoMessageHandler("clientPublisher", mqttClientFactory());
messageHandler.setAsync(true);
messageHandler.setDefaultTopic("topic");
return messageHandler;
}
@Bean
public MessageChannel mqttOutboundChannel(){
return new DirectChannel();
}
@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface Gateway {
void sendToMqtt(String data);
}
}
更新:
不确定这是否是正确的日志记录,但这是我通过添加得到的:
logging.level.org.springframework.web=Debug
logging.level.org.hibernate=Error
到 application.properties。
最佳答案
使用 Messaging Gateway或者简单地向 channel 发送消息。
编辑
@SpringBootApplication
public class So47846492Application {
public static void main(String[] args) {
SpringApplication.run(So47846492Application.class, args).close();
}
@Bean
public ApplicationRunner runner(MyGate gate) {
return args -> {
gate.send("someTopic", "foo");
Thread.sleep(5_000);
};
}
@Bean
@ServiceActivator(inputChannel = "toMqtt")
public MqttPahoMessageHandler mqtt() {
MqttPahoMessageHandler handler = new MqttPahoMessageHandler("tcp://localhost:1883", "foo",
clientFactory());
handler.setDefaultTopic("myTopic");
handler.setQosExpressionString("1");
return handler;
}
@Bean
public MqttPahoClientFactory clientFactory() {
DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
factory.setUserName("guest");
factory.setPassword("guest");
return factory;
}
@Bean
public MqttPahoMessageDrivenChannelAdapter mqttIn() {
MqttPahoMessageDrivenChannelAdapter adapter =
new MqttPahoMessageDrivenChannelAdapter("tcp://localhost:1883", "bar", "someTopic");
adapter.setOutputChannelName("fromMqtt");
return adapter;
}
@ServiceActivator(inputChannel = "fromMqtt")
public void in(String in) {
System.out.println(in);
}
@MessagingGateway(defaultRequestChannel = "toMqtt")
public interface MyGate {
void send(@Header(MqttHeaders.TOPIC) String topic, String out);
}
}
关于java - Spring Integration 手动发布消息到 channel ,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47846492/
我目前是一群学生的团队负责人,他们正在为一门类(class)的项目工作,该类(class)目前由电气和计算机工程专业的学生组成。我是一名电气工程专业的学生,我还没有开始研究该项目的软件组件。我觉得
我们对 spring-integration 非常满意,除非事情没有按预期工作。然后真的很难找出发生了什么(我们使用的是xml配置)。有人可以将我指向 spring 集成组件背后的 java 组件以便
我需要评估几个积分,我正在使用正常 (0,1) 密度来测试。 在 python 中 import scipy.integrate as integrate import scipy.stats imp
我想保留原始请求的原始有效负载并将其放在 xslt-transformer 或其他操作中。我丢失了它,因为我使用了 xslt-transformer,并且我只需要转换中的一些元素。所以我的场景是: 1
我想知道在 Spring Integration 中使消息不可变的原因是什么。 仅仅是因为多线程环境中的线程安全吗? 表现?当您每次要向现有消息添加某些内容时都必须创建新消息时,您不会受到性能惩罚吗?
我有一个偶尔会返回 503 错误的 http 网关调用。我想配置 retry advice围绕那个调用,但我不想为每个错误都这样做,只是 503s。 我已经配
我们正在使用 Spring Integration 4.2.3 聚合器组件和定义的组超时,并期望组在给定的超时值内超时,同时向组添加消息和发布大小标准不满足。 但我们看到了不同的结果,当我们向服务输入
我需要轮询邮件服务器。由于我的项目已经在 Spring 中,我使用 Spring-Integration 来轮询邮件服务器。我在这方面很成功。但现在我必须轮询多封电子邮件。有人可以告诉我该怎么做吗。
现在,我正在从事的项目已经达到了一个复杂的水平,需要完成多个步骤(实际上,它变得不可思议!)才能生产出完整/可用的产品。不幸的是,我们并不是从Continuos Integration的心态开始的,所
哪些指标表明应该使用企业集成模式框架?另一方面,哪些指标表明应该坚持使用简单的旧代码进行逻辑流? 就我而言,我们将 Spring Integration 应用于映射/处理应用程序,该应用程序从数据库读
我们在 XML 中有以下工作配置,并正在尝试转换为 DSL。不确定它们是否等效,也尝试使用 inboundAdapter。但是,我无法弄清楚如何在那里设置与并发相关的值。有人可以建议他们是否在 DSL
所以我在玩这个: factors :: Integral a => a -> [a] factors n = filter (\d -> n `rem` d == 0) . takeWhile (\d
我是 Spring 集成的新手,正在尝试建模一个流程,其中我通过 HTTP 进行同步请求和响应,但也是交付的同一流程的一部分将响应发送到队列,对其进行后处理,并让一个单独的进程使用该响应。所以从调用流
我有一个 Spring Integration Flow 项目,它公开了一个 Rest 网关,在收到 Rest POST 请求后,它会执行一些小逻辑。基于一些有效负载参数,我想动态激活另一个 Spri
我浏览了 Internet,在 Spring 论坛上发帖,并阅读了几乎全部在线文档,但我无法弄清楚 Spring Integration 是否可以在单个多资源 (JTA) 事务中处理多个消息。这对于我
我正在查看 spring-projects/spring-integration-samples 中的聚合器示例。 https://github.com/spring-projects/spring-
我正在查看 spring-projects/spring-integration-samples 中的聚合器示例。 https://github.com/spring-projects/spring-
我有一个 spring-integration接受 org.w3c.dom.Document 并返回域对象的转换器。这很好。如果缺少元素,我会引发应用程序异常。 但是,我想将该异常放到错误 chann
我显然已经通读了 documentation , 但我无法找到更详细的幕后情况描述。具体来说,有几个行为我很疑惑: 一般设置 import numpy as np from scipy.integra
我正在使用 Spring Integration 使用以下配置从目录中读取文件。但是,我希望在找到任何文件后停止轮询,直到服务不再重新启动为止。有什么方法可以在运行时更改轮询器延迟或在运行时启动/停止
我是一名优秀的程序员,十分优秀!