gpt4 book ai didi

java - Spring集成DSL : PublishSubscribeChannel order

转载 作者:行者123 更新时间:2023-11-30 02:22:53 25 4
gpt4 key购买 nike

我想了解 PublishSubscribeChannel 的工作原理,因此我实现了一个小示例:

@Bean
public MessageSource<?> integerMessageSource() {
MethodInvokingMessageSource source = new MethodInvokingMessageSource();
source.setObject(new AtomicInteger());
source.setMethodName("getAndIncrement");
return source;
}



@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 1: {}", message.getPayload())))
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 2: {}", message.getPayload())))
.subscribe(flow -> flow
.transform(source -> MessageBuilder.withPayload("Error").build())
.handle(message -> {
LOG.info("Error");
}))
.subscribe(flow -> flow
.handle(message -> LOG.info("Handling message, step 4: {}", message.getPayload())))
)
.get();
// @formatter:on
}

我预计我会看到输出:

Handling message, step 1...
Handling message, step 2...
Error
Handling message, step 4...

但是第三个子流(带有“Error”输出)始终首先处理。当我尝试为步骤 1、2 和 4 定义顺序 I 时,我得到以下控制台输出(警告):

o.s.integration.dsl.GenericEndpointSpec  : 'order' can be applied only for AbstractMessageHandler

我本以为订阅者会按照订阅的顺序被调用,但事实似乎并非如此。

我正在使用 Spring Boot 1.5.4 和 Spring Integration 4.3.10。

最佳答案

问题在于 lambda 处理程序不是 Ordered - 发布/订阅 channel 的一般约定是首先(按顺序)调用 Ordered 订阅者,然后调用无序订阅者。

由于 lambda 无法实现多个接口(interface),我不确定我们能做些什么。

作为解决方法,您可以执行以下操作...

@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.handle(handler("Handling message, step 1: {}")))
.subscribe(flow -> flow
.handle(handler("Handling message, step 2: {}")))
.subscribe(flow -> flow
.transform(message -> "Error")
.handle(message -> {
LOG.info("Error");
}))
.subscribe(flow -> flow
.handle(handler("Handling message, step 4: {}")))
)
.get();
// @formatter:on
}

private MessageHandler handler(String format) {
return new AbstractMessageHandler() {

@Override
protected void handleMessageInternal(Message<?> message) throws Exception {
LOG.info(format, message.getPayload());
}

};

}

以便所有订阅者都有序

编辑

这里有一个稍微简单一点的解决方法 - 使用桥而不是 lambda 启动子流程,以便所有子流程的第一个组件实现 Ordered...

@Bean
public IntegrationFlow mainFlow() {
// @formatter:off
return IntegrationFlows
.from(integerMessageSource(), c -> c.poller(Pollers.fixedRate(1000)))
.publishSubscribeChannel(pubSub -> pubSub
.subscribe(flow -> flow
.bridge(e -> e.id("s1"))
.handle(message -> LOG.info("Handling message, step 1: {}", message.getPayload())))
.subscribe(flow -> flow
.bridge(e -> e.id("s2"))
.handle(message -> LOG.info("Handling message, step 2: {}", message.getPayload())))
.subscribe(flow -> flow
.transform(source -> MessageBuilder.withPayload("Error").build())
.handle(message -> {
LOG.info("Error");
}))
.subscribe(flow -> flow
.bridge(e -> e.id("s4"))
.handle(message -> LOG.info("Handling message, step 4: {}", message.getPayload())))
)
.get();
// @formatter:on
}

关于java - Spring集成DSL : PublishSubscribeChannel order,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46358840/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com