- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我尝试通过设置轮询器定期从队列中轮询 (JSON) 消息并处理消息然后保存到我的数据库来实现 Spring IntegrationFlow 以使用 AWS SQS 队列。
我成功地从队列中轮询单个 JSON 消息模式并转换为我的自定义对象。现在我有 2 种类型的 JSON 模式发送到同一个 SQS 队列。例如,
`"Type" : "Notification",
"MessageId" : "xxxx-xxxx-xxxx",
"TopicArn" : "arn:aws:sns:us-west-2:xxxxx:topicName00",
"Subject" : "OK: \"test00\" in US-West-2",
"Message" : "{\"AlarmName\":\"test00\..."`
和
`"Type" : "Notification",
"MessageId" : "xxxxx-xxxx-xxxxx",
"TopicArn" : "arn:aws:sns:us-west-2:xxxxxx:topicName01",
"Message" : "{\"version\":\"0\",\"id\":\"xxxxx\",\"detail-type\":\"EC2 Instance State-change Notification\",\"source\":\"aws.ec2\..."`
这些消息被发送到同一个队列,我想使用同一个轮询器轮询队列,然后根据消息正文将消息路由到不同的转换器和 serviceActivator(handle)。
@Bean
public IntegrationFlow sqsIntegrationFlow()
{
return IntegrationFlows.from( this.sqsMessageSource(), c -> c.poller( myPoller() ) )
.channel( new DirectChannel() )
.<Payload,Boolean>route( input -> input.value().contains( "EC2 Instance State-change Notification" ),
mapping -> mapping
.subFlowMapping( "true", sf -> sf.channel( new DirectChannel() )
.transform(
SqsMessageToInstanceConverter::convertSqsMessagesToInstanceInfo )
.channel( new DirectChannel() ).handle( ( message ) -> {
ec2InstanceService.updateInstanceInfo( (List<SqsMessageResult>) message.getPayload() );
} ) )
.subFlowMapping( "false", sf -> sf.channel( new DirectChannel() )
.transform( SqsMessageToInstanceConverter::convertSqsMessageToAlarmInfo )
.channel( new DirectChannel() ).handle( (alarm -> {
cwAlarmService.updateAlarmInfo(
(List<SqsAlarmMessageResult>) alarm.getPayload() );
}) ) ) )
.get();
}
我尝试如上所述使用路由器,并使用消息正文中的字符串(“EC2 实例状态更改通知”)识别消息,但出现错误
java.lang.ClassNotFoundException:org.springframework.integration.support.management.MappingMessageRouterManagement
我的问题是:
1.这是路由器的正确使用方式吗?
2。如何实现使用集成流程处理 2 条不同 JSON 消息的目标?
最佳答案
是的,它是正确的(我大约一个小时前写了一个类似的流程)。看起来你有某种类路径问题 - 该接口(interface)与路由器位于同一个 jar 中。您在哪里运行您的应用程序?
尝试使用 -verbose
JVM arg 运行,我刚刚做了并得到了这个...
...
[Loaded org.springframework.integration.router.AbstractMessageRouter from file:/Users/.../.m2/repository/org/springframework/integration/spring-integration-core/4.2.5.RELEASE/spring-integration-core-4.2.5.RELEASE.jar]
[Loaded org.springframework.integration.support.management.MappingMessageRouterManagement from file:/Users/.../.m2/repository/org/springframework/integration/spring-integration-core/4.2.5.RELEASE/spring-integration-core-4.2.5.RELEASE.jar]
...
关于java - 用于多个 JSON 对象模式的 Spring IntegrationFlow,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/37336196/
我有一些 XML,我正在尝试通过集成流程获取这些 XML。以下是我正在执行的步骤: 如果消息包含 XML 节点:“filterMe”且类型 =“filterType”,则将其过滤掉 - 如果是这样,我
我有几个@Configuration类,其中包括由@Bean注释的高度复杂的IntegrationFlow。我想对这些流程中使用的每个组件进行单元测试。例如: @Configuration publi
我在为使用 Spring Integration DSL 的 IntegrationFlow 编写测试用例时遇到了麻烦。下面是我的代码片段,我想测试“转换”部分。请提供一些模拟 handle 部分的帮
我一直在研究 Spring Integration (SI) DSL。我有一个定义了以下异步网关的 Rest 服务: @MessagingGateway public interface Provis
我目前有一个 IntegrationFlow 实现,它利用 Service 类来实现流要执行的所有所需功能。像这样的事情... @Service public class FlowService {
我一直在使用 Spring Integration DSL 来实现一些消息处理流程。 我如何才能真正对单个 IntegrationFlow 进行单元测试,谁能提供一个关于如何单元测试的示例,即转换这个
我有一个 spring 集成 IntegrationFlow ,其定义如下: IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory,
我的集成流程定义如下: IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
我有一个 Spring Integration Batch 作业,当文件到达时会触发该作业: @Bean public IntegrationFlow fileTriggeredIntegration
我收到来自客户端的请求,该请求返回一个 SendRequest-Object,该对象具有 HttpMethod、要发送的路径和数据。现在我想根据我获得 API 的对象发送请求。 发送后我会收到回复。
我尝试通过设置轮询器定期从队列中轮询 (JSON) 消息并处理消息然后保存到我的数据库来实现 Spring IntegrationFlow 以使用 AWS SQS 队列。 我成功地从队列中轮询单个 J
我在测试使用 Spring Integration 和 Spring Integration DSL 的应用程序时遇到问题。当我运行我的应用程序时,流设置正确,没有问题,但是对于我的测试,我想隔离某些
我在 Spring 集成中创建了一些 IntegrationFlow,如下所示: IntegrationFlows.from(..).id("test").autoStartup(false). ..
我的集成流程定义如下: IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
我使用 Spring Integration DSL 的速度太快了。我正在玩下面的例子。 @Bean public IntegrationFlow flow() { retu
我有一个将文件传输到动态创建的某些子目录下的 sftp 的用例。我使用自定义 SftpMessageHandler 方法和网关来完成这项工作。但这种方法的问题是,它在成功上传后并没有删除本地临时文件。
我是一名优秀的程序员,十分优秀!