gpt4 book ai didi

java - 升级到 Spring Integration 5 后消费消息不再有效

转载 作者:行者123 更新时间:2023-12-02 01:31:49 25 4
gpt4 key购买 nike

我正在尝试将使用 Spring Integration 4.3 和 Spring Boot 1.6 的项目升级到 Spring Integration 5.1 和 Spring Boot 2.1。以前我有以下配置:

IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("myId")
.autoStartup(autoStartup)
.prefetchCount(10)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
.messageConverter(messageConverter()))
.aggregate(a -> a.correlationExpression("payload.entityId")
.releaseExpression("size() eq iterator().next().payload.batchSize")
.sendPartialResultOnExpiry(true)
.groupTimeout(2000)
.expireGroupsUponCompletion(true)
.outputProcessor(myMessageGroupProcessor))
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();

在升级过程中,我尝试遵循文档 here因此将配置更改为:

@Configuration
@EnableAutoConfiguration
@EnableIntegration
public class SpringConfig {

@Bean(name = "myFlowId")
public IntegrationFlow myFlow(ConnectionFactory connectionFactory, ServiceActivatorBean serviceActivatorBean,
@Value("${spring.integration.flow.auto-startup:true}") boolean autoStartup,
MyMessageGroupProcessor myMessageGroupProcessor) {
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, "queueName")
.id("myId")
.autoStartup(autoStartup)
.configureContainer(c -> c.acknowledgeMode(MANUAL)
.prefetchCount(10)
.concurrentConsumers(2)
.maxConcurrentConsumers(3)
)
.messageConverter(messageConverter()))
.aggregate(a -> a.correlationExpression("payload.entityId")
.releaseExpression("size() eq one().payload.batchSize")
.sendPartialResultOnExpiry(true)
.groupTimeout(2000)
.expireGroupsUponCompletion(true)
.outputProcessor(myMessageGroupProcessor))
.handle(serviceActivatorBean, "myMethod", e -> e.advice(requestHandlerRetryAdviceForIntegrationFlow()))
.get();
}
}

但是当我发布消息时,集成流程似乎没有接收/处理它们。我没有收到任何错误日志(即使我启用了调试日志记录,也没有收到任何日志),而且我不太确定从哪里开始调试。我确信消息实际上已发布到 RabbitMQ,所以这不是问题。我可能会错过什么?

最佳答案

我的问题实际上不是由于 Spring Integration,而是与 Spring AMQP 的更改有关。以前“可声明”可以这样创建:

@Bean
List<Binding> myBinding() {
return List.of(<binding1>, <binding2>, ..)
}

但在 Spring AMQP 2.1 中应更改为:

@Bean
Declarables myBinding() {
return new Declarables(List.of(<binding1>, <binding2>, ..))
}

查看文档 here .

顺便说一句,我的releaseExpression也错了,应该是size() eq one.payload.batchSize

关于java - 升级到 Spring Integration 5 后消费消息不再有效,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55918779/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com