gpt4 book ai didi

java - Spring Cloud Stream RabbitMQ Binder - Spring Cloud函数错误处理

转载 作者:行者123 更新时间:2023-12-02 06:12:21 24 4
gpt4 key购买 nike

我正在使用 Spring Cloud Stream Rabbit Binder 和 Spring Cloud 函数并定义监听器,例如:

public Function<Flux<SomeObject>, Flux<OtherObject>> foo() {
//some code
}

我还将失败的消息重新路由到 DLQ。问题是当发生像 org.springframework.messaging.converter.MessageConversionException 这样的 fatal error 时。它不会像 https://docs.spring.io/spring-amqp/reference/html/#exception-handling 中提到的那样被 ConditionalRejectingErrorHandler 处理。 ,并永远循环下去。

有没有办法让这个工作与ConditionalRejectingErrorHandler一起工作?

现在,我通过使用 @ServiceActivator(inputChannel = "errorChannel") 并自行处理错误来修复问题。

依赖关系:

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.2.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<dependencies>
<dependency>
<groupId>org.springframework.boot.experimental</groupId>
<artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-consul-config</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-hateoas</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-rabbit</artifactId>
</dependency>
</dependencies>

最佳答案

我们长期以来一直在争论用于命令式函数的错误处理和其他功能,以及它们如何应用于(或者甚至可以应用于)响应式(Reactive)函数,并尝试了一些不同的事情,但不幸的是,这一切都归结为阻抗不匹配。

您所描述的方法基于对单个消息的操作。这是命令式消息处理程序中的工作单元,例如 Function<String, String> 。您使用响应式(Reactive)风格,从而将工作单元从流中的单个消息更改为整个流。

简而言之:

- Function<?, ?> - unit of work is Message
- Function<Flux<?>, Flux<?>> - unit of work is the entire stream

您还可以轻松观察到它,因为响应式函数在应用程序的生命周期内仅调用一次,而命令式函数则针对每个到达的消息调用一次。我这么说的原因是,我们用于命令式消息处理程序(函数)的基于框架的方法不能应用于响应式消息处理程序而不引起副作用。一般响应式(Reactive)开发人员都理解这一点,特别是考虑到响应式(Reactive) API 的丰富性,特别是在错误处理方面

无论如何,我们都会相应地更新文档。

关于java - Spring Cloud Stream RabbitMQ Binder - Spring Cloud函数错误处理,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60434530/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com