gpt4 book ai didi

spring-integration - 分解 DSL IntegrationFlows

转载 作者:行者123 更新时间:2023-12-04 00:12:51 27 4
gpt4 key购买 nike

我一直在研究 Spring Integration (SI) DSL。我有一个定义了以下异步网关的 Rest 服务:

@MessagingGateway
public interface Provision {
@Async
@Gateway(requestChannel = "provision.input")
ListenableFuture<List<ResultDto>> provision(List<ItemsDto> stuff);
}

在逐行演练中,我有以下示例 IntegrationFlow。

@Bean
public IntegrationFlow provision() {
return f -> f
.split(ArrayList.class, List::toArray)
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.<ItemsDto, String>route(ItemsDto::getType, m -> m
.subFlowMapping("IPTV", sf -> sf
.<ItemsDto, String>route(ItemsDto::getAction, m2 -> m2
.subFlowMapping("OPEN", sf2 -> sf2
.handle((p, h) -> iptvService.open((ItemsDto) p))))
)
)
.aggregate();
}

如您所见,我有多层路由。我需要把事情分解一下。我已经尝试了几种不起作用的方法(在这里我没有得到响应......线程不等待):

@Bean(name = "routerInput")
private MessageChannel routerInput() {
return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow provision() {
return f -> f
.split(ArrayList.class, List::toArray)
.channel(c -> c.executor(Executors.newCachedThreadPool()))
.<ItemsDto, String>route(ItemsDto::getType, m ->
m.subFlowMapping("IPTV", sf -> sf.channel("routerInput"))
)
.aggregate();
}

@Bean
public IntegrationFlow action() {
return IntegrationFlows.from("routerInput")
.<ItemsDto, String>route(ItemsDto::getAction, m -> m
.subFlowMapping("OPEN", sf -> sf
.handle(p -> iptvService.open((ItemsDto) p.getPayload())))).get();
}

我显然在概念上遗漏了一些东西 :) 有人可以提供“如何以及为什么”的意见吗?

我有一个需要拆分的项目列表,按“类型”路由,然后按“操作”路由,最后聚合(包含处理程序的响应)。每个处理的项目都需要并行处理。

提前致谢

更新:根据 Artem 的建议,我删除了所有异步内容。我把它修剪成几乎没有...

@Bean(name = "routerInput")
private MessageChannel routerInput() {
return MessageChannels.direct().get();
}

@Bean
public IntegrationFlow provision() {
return f -> f
.split()
.<ItemDto, String>route(ItemDto::getType, m ->
m.subFlowMapping("IPTV", sf -> sf.channel("routerInput")))
.aggregate();
}

@Bean
public IntegrationFlow action() {
return IntegrationFlows.from("routerInput")
.<ItemDto, String>route(ItemDto::getAction, m -> m
.subFlowMapping("OPEN", sf -> sf
.handle((p, h) -> iptvService.open((ItemDto) p)))).get();
}

我让它通过改变来响应

.handle(p ->

对此

.handle((p, h) ->

所以它至少会响应,但不会聚合拆分的 3 个测试项目。输出包含 1 个项目。我需要使用流收集吗?发布政策?这不应该没问题吗?

最佳答案

如果你想把它拆开,使用 channelMapping 可能比 subflowMapping 更简单......

    @Bean
public IntegrationFlow typeRoute() {
return IntegrationFlows.from(foo())
.split()
.<ItemsDto, String>route(ItemsDto::getType, m -> m
.channelMapping("foo", "channel1")
.channelMapping("bar", "channel2"))
.get();
}

@Bean
public IntegrationFlow fooActionRoute() {
return IntegrationFlows.from(channel1())
.<ItemsDto, String>route(ItemsDto::getAction, m -> m
.channelMapping("foo", "channel3")
.channelMapping("bar", "channel4"))
.get();
}

@Bean
public IntegrationFlow barActionRoute() {
return IntegrationFlows.from(channel1())
.<ItemsDto, String>route(ItemsDto::getAction, m -> m
.channelMapping("foo", "channel5")
.channelMapping("bar", "channel6"))
.get();
}

@Bean
public IntegrationFlow fooFooHandle() {
return IntegrationFlows.from(channel3())
// handle
.channel(aggChannel())
.get();
}

为其他选项创建流程并聚合每个结果:

    // fooBarHandle(), barFooHandle(), barBarHandle()


@Bean IntegrationFlow agg() {
return IntegrationFlows.from(aggChannel())
.aggregate()
.get();
}

并行度是通过使用 ExecutorChannels...

    @Bean
public MessageChannel channel1() {
return new ExecutorChannel(exec());
}

@Bean
public MessageChannel channel2() {
return new ExecutorChannel(exec());
}

@Bean
public MessageChannel channel3() {
return new DirectChannel();
}

@Bean
public MessageChannel channel4() {
return new DirectChannel();
}

@Bean
public MessageChannel channel5() {
return new DirectChannel();
}

@Bean
public MessageChannel channel6() {
return new DirectChannel();
}

@Bean
public MessageChannel aggChannel() {
return new DirectChannel();
}

关于spring-integration - 分解 DSL IntegrationFlows,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/34706312/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com