gpt4 book ai didi

redis - spring 集成 redis 轮询器与事务

转载 作者:可可西里 更新时间:2023-11-01 11:15:11 24 4
gpt4 key购买 nike

我使用 spring integeration redis,从 redis 中轮询消息,如下所示:

@Bean
public PseudoTransactionManager transactionManager() {
final PseudoTransactionManager pseudoTransactionManager = new PseudoTransactionManager();
return pseudoTransactionManager;
}

@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory() {
ExpressionEvaluatingTransactionSynchronizationProcessor transactionSynchronizationProcessor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
transactionSynchronizationProcessor.setAfterCommitExpression(this.PARSER.parseExpression("#store.rename('commit')"));
transactionSynchronizationProcessor.setAfterRollbackExpression(this.PARSER.parseExpression("#store.rename('roll')"));
DefaultTransactionSynchronizationFactory transactionSynchronizationFactory = new DefaultTransactionSynchronizationFactory(transactionSynchronizationProcessor);
return transactionSynchronizationFactory;
}


@Bean
public SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapter(RedisStoreMessageSource redisStoreMessageSource, TransactionSynchronizationFactory transactionSynchronizationFactory) {

SourcePollingChannelAdapterFactoryBean sourcePollingChannelAdapterFactoryBean = new SourcePollingChannelAdapterFactoryBean();
sourcePollingChannelAdapterFactoryBean.setAutoStartup(true);
sourcePollingChannelAdapterFactoryBean.setOutputChannelName("mail-delivery-status-route-channel");
sourcePollingChannelAdapterFactoryBean.setSource(redisStoreMessageSource);
PollerMetadata pollerMetadata = new PollerMetadata();
pollerMetadata.setMaxMessagesPerPoll(10);
pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory);
PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000);
pollerMetadata.setTrigger(periodicTrigger);
sourcePollingChannelAdapterFactoryBean.setPollerMetadata(pollerMetadata);


return sourcePollingChannelAdapterFactoryBean;
}


@Bean
public TestHandler testHandler() {

return new TestHandler();
}

@Bean
public IntegrationFlow trans() {
return flow -> flow.channel("mail-delivery-status-route-channel").handle(testHandler());
}

正常情况下,流程完成后,会进行afterCommit #store.rename('commit')操作,但现在不做,会继续轮询,我调试,发现:AbstractPollingEndpoint#bindResourceHolderIfNecessary TransactionSynchronizationManager.isActualTransactionActive() 始终为 false。我怎样才能改进程序。

最佳答案

pollerMetadata.setTransactionSynchronizationFactory(transactionSynchronizationFactory); 是不够的。您缺少将 adviceChain 添加到 PollerMetadata 中,其中之一应该是 TransactionInterceptor。为方便起见,请参阅 TransactionInterceptorBuilder

虽然完全不清楚如果项目中已经有 Java DSL 并且 IntegrationFlow 可以为您处理所有样板代码,为什么还要手动使用 SourcePollingChannelAdapterFactoryBean。我的意思是你需要调查:

/**
* Populate the provided {@link MessageSource} object to the {@link IntegrationFlowBuilder} chain.
* The {@link org.springframework.integration.dsl.IntegrationFlow} {@code startMessageSource}.
* In addition use {@link SourcePollingChannelAdapterSpec} to provide options for the underlying
* {@link org.springframework.integration.endpoint.SourcePollingChannelAdapter} endpoint.
* @param messageSource the {@link MessageSource} to populate.
* @param endpointConfigurer the {@link Consumer} to provide more options for the
* {@link org.springframework.integration.config.SourcePollingChannelAdapterFactoryBean}.
* @return new {@link IntegrationFlowBuilder}.
* @see MessageSource
* @see SourcePollingChannelAdapterSpec
*/
public static IntegrationFlowBuilder from(MessageSource<?> messageSource,
Consumer<SourcePollingChannelAdapterSpec> endpointConfigurer) {

并在 PollerSpec 上配置 .transactional()transactionSynchronizationFactory()

关于redis - spring 集成 redis 轮询器与事务,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51301319/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com