gpt4 book ai didi

Spring Cloud Streams Error Handling Not Working(Spring Cloud Streams错误处理不起作用)

转载 作者:bug小助手 更新时间:2023-10-25 22:26:36 26 4
gpt4 key购买 nike



There are so few examples of different methods of error-handling in spring cloud streams, and the few that are provided partially via the documentation don't seem to work either.

在Spring Cloud Stream中,错误处理的不同方法的示例非常少,而且部分通过文档提供的少数几种方法似乎也不起作用。


I have a test repository with multiple methods of error capture attempted, and none of the methods work in any way.

我有一个测试存储库,其中尝试了多种错误捕获方法,但这些方法都不起作用。


Spring Cloud Streams has reliable deserialization and serialization error handling, but error handling from map, transform and processor methods are very under-documented.

Spring Cloud Streams有可靠的反序列化和序列化错误处理,但来自map、转换和处理器方法的错误处理文档非常少。


Repository for samples: https://github.com/StevenPG/scs-experimentation/tree/main/scs4-error-handling/error-handling

样本库:https://github.com/StevenPG/scs-experimentation/tree/main/scs4-error-handling/error-handling


I have only two main files

我只有两个主要文件


@SpringBootApplication
public class ErrorHandlingApplication {

public final Random randomNumberGenerator = new Random(System.currentTimeMillis());

public static void main(String[] args) {
SpringApplication.run(ErrorHandlingApplication.class, args);
}

@Bean
public Supplier<Message<String>> randomIntegerPublisher() {
return () -> MessageBuilder
.withPayload(String.valueOf(randomNumberGenerator.nextInt()))
.setHeader(KafkaHeaders.RECEIVED_KEY, 0)
.build();
}

@Bean
public Consumer<KStream<String, String>> errorStream() {
return input -> input
// Remove odd numbers so we throw an exception on every other message
.map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
.filter((key, value) -> (value & 1) == 0)
.map((key, value) -> {
throw new RuntimeException("Pushing uncaught error to kill stream!");
}
);
}

@Bean
public Consumer<KStream<String, String>> errorHandledStream() {
return input -> input
// Remove odd numbers so we throw an exception on ever other message
.map((key, value) -> new KeyValue<>(key, Integer.parseInt(value)))
.filter((key, value) -> (value & 1) == 0)
.map((key, value) -> {
System.out.println("This should not kill the stream");
throw new RuntimeException("Publishing error to be caught!");
}
);
}

@Bean
// TODO - doesn't seem to be working, is this because we're using kstreams?
public Consumer<ErrorMessage> defaultErrorHandler() {
return v -> {
System.out.println("Caught and handling error");
System.out.println(v.toString());
};
}

@Bean
// TODO - not working via the config
/**
* bindings:
* errorHandledStream-in-0:
* consumer:
* commonErrorHandlerBeanName: defaultCommonErrorHandler
*/
public CommonErrorHandler defaultCommonErrorHandler() {
return new CommonLoggingErrorHandler();
}

/**
* Also not working
*/
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setCommonErrorHandler(defaultCommonErrorHandler());
return factory;
}
}

and


spring:
cloud:
function:
definition: randomIntegerPublisher;errorStream;errorHandledStream;defaultErrorHandler
stream:
default:
error-handler-definition: defaultErrorHandler
kafka:
streams:
binder:
deserialization-exception-handler: logandcontinue
bindings:
errorHandledStream-in-0:
error-handler-definition: defaultErrorHandler
consumer:
commonErrorHandlerBeanName: defaultCommonErrorHandler
bindings:
errorHandledStream-in-0:
consumer:
commonErrorHandlerBeanName: defaultCommonErrorHandler
bindings:
randomIntegerPublisher-out-0:
destination: integer-topic
errorStream-in-0:
destination: integer-topic
errorHandledStream-in-0:
destination: integer-topic
error-handler-definition: defaultErrorHandler

Pretty much every documented variation of error handling does not seem to function correctly.

几乎所有记录在案的错误处理变体似乎都不能正常工作。


My first stream, errorStream acts as expected. Killing the relevant consumer (although the global configs should catch this).

我的第一个流errorStream的行为与预期一致。终止相关的使用者(尽管全局配置应该会捕捉到这一点)。


The second stream, errorHandledStream attempts to have config provided that catches the error.

第二个流errorHandledStream尝试提供捕捉错误的配置。


The primary ask, is when exceptions occur within the map method (for this example), to be able to have some exception handler perform an action so that the stream does not crash and restart.

主要的要求是,当map方法中发生异常时(对于本例),能够让某个异常处理程序执行某个操作,以便流不会崩溃和重新启动。


This is all with the latest spring-cloud-streams versions, and the following dependencies.

这都是最新的Spring-Cloud-Streams版本,以及以下依赖项。


extra["springCloudVersion"] = "2022.0.3"

implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka-streams")
implementation("org.springframework.cloud:spring-cloud-stream-binder-kafka")

The following references were used:

引用的参考文献如下:


https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling

Https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-error-handling


https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_error_handling

Https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream-binder-kafka.html#_error_handling


What am I missing here, and/or what reference can I use to review and implementation. OR, is there a working example posted anywhere (or can be provided here) to use as a starting point?

我在这里遗漏了什么,和/或我可以使用什么参考来审查和实施。或者,是否有一个工作示例张贴在任何地方(或可以在这里提供)作为起点?


更多回答
优秀答案推荐

The capability requested here is being added as part of https://github.com/spring-cloud/spring-cloud-stream/issues/2779.

此处请求的功能将作为https://github.com/spring-cloud/spring-cloud-stream/issues/2779.的一部分进行添加


This is accomplished by using https://github.com/spring-cloud/spring-cloud-stream/blob/main/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/DltAwareProcessor.java

这是通过使用https://github.com/spring-cloud/spring-cloud-stream/blob/main/binders/kafka-binder/spring-cloud-stream-binder-kafka-streams/src/main/java/org/springframework/cloud/stream/binder/kafka/streams/DltAwareProcessor.java来实现的


更多回答

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com