gpt4 book ai didi

java - 是否有任何开箱即用的 spring ingetration 模式用于使用动态队列中的消息并处理它们?

转载 作者:行者123 更新时间:2023-11-30 09:59:12 26 4
gpt4 key购买 nike

我正在尝试使用 spring 集成实现一个场景,它必须处理动态生成的 redis 队列。到目前为止,我在 Internet 上找到的示例适用于预定义的队列。

在我的情况下,有超过 100 个 redis 队列由应用程序动态生成,我的代码将使用来自这些队列的消息。我已经成功地创建了一个 POC 类型的项目(github link)并且它正在运行。

我想知道是否有更好的方法来实现同样的目标。据我所知,企业集成模式没有说明如何从多个动态队列或消息源中消费消息,除了自定义或更改现有框架源代码之外,是否有任何开箱即用的解决方案?

相比 xml,我更喜欢使用 Spring Integration Java 配置和 DSL。

最佳答案

参见 Dynamic and Runtime Integration Flows .

To simplify the development experience, Spring Integration introduced IntegrationFlowContext to register and manage IntegrationFlow instances at runtime, as the following example shows:

@Autowired
private AbstractServerConnectionFactory server1;

@Autowired
private IntegrationFlowContext flowContext;

...

@Test
public void testTcpGateways() {
TestingUtilities.waitListening(this.server1, null);

IntegrationFlow flow = f -> f
.handle(Tcp.outboundGateway(Tcp.netClient("localhost", this.server1.getPort())
.serializer(TcpCodecs.crlf())
.deserializer(TcpCodecs.lengthHeader1())
.id("client1"))
.remoteTimeout(m -> 5000))
.transform(Transformers.objectToString());

IntegrationFlowRegistration theFlow = this.flowContext.registration(flow).register();
assertThat(theFlow.getMessagingTemplate().convertSendAndReceive("foo", String.class), equalTo("FOO"));
}

This is useful when we have multiple configuration options and have to create several instances of similar flows. To do so, we can iterate our options and create and register IntegrationFlow instances within a loop. Another variant is when our source of data is not Spring-based and we must create it on the fly. ...

编辑

@SpringBootApplication
public class So59117728Application {

public static void main(String[] args) {
SpringApplication.run(So59117728Application.class, args).close();
}

@Bean
public ApplicationRunner runner(RedisConnectionFactory cf, IntegrationFlowContext context,
RedisTemplate<String, String> template) {

return args -> {
IntegrationFlow flow = IntegrationFlows
.from(redisEndpoint("So59117728Application", cf))
.handle(System.out::println)
.get();
context.registration(flow).id("myDynamicFlow").register();
template.boundListOps("So59117728Application").leftPush("foo");

Thread.sleep(10_000);
context.remove("myDynamicFlow");
};
}

private RedisQueueMessageDrivenEndpoint redisEndpoint(String queueName, RedisConnectionFactory cf) {
RedisQueueMessageDrivenEndpoint endpoint = new RedisQueueMessageDrivenEndpoint(queueName, cf);
endpoint.setSerializer(new StringRedisSerializer());
return endpoint;
}

}

关于java - 是否有任何开箱即用的 spring ingetration 模式用于使用动态队列中的消息并处理它们?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59117728/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com