gpt4 book ai didi

java - 使用 Spring-Kafka-2.3.0 及更高版本使用主题中的消息并以指数退避重试,直到成功为止

转载 作者:行者123 更新时间:2023-12-02 09:05:49 25 4
gpt4 key购买 nike

以下是配置

@Bean
@Autowired
public ConcurrentKafkaListenerContainerFactory<byte[], byte[]> kafkaListContFactory(@Qualifier("retryTemplate") RetryTemplate retryTemplate, @Qualifier("batchErrorHandler") ErrorHandler errorHandler, @Qualifier("batchErrorHandler") BatchErrorHandler batchErrorHandler) {
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConsumerFactory(consumerFactory());
factory.getContainerProperties().setAckOnError(false);
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
factory.setErrorHandler(errorHandler);
factory.getContainerProperties().setAckOnError(false);
factory.setStatefulRetry(true);
factory.setRetryTemplate(retryTemplate);
}

**Retry config**

@Bean("retryTemplate")
public RetryTemplate retryTemplate() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(delay);
backOffPolicy.setMultiplier(multiplier);
backOffPolicy.setMaxInterval(20000);
final RetryTemplate template = new RetryTemplate();
template.setRetryPolicy(new AlwaysRetryPolicy());
template.setBackOffPolicy(backOffPolicy);
return template;
}

SeekToCurrentErrorHandler config
I do not want to recover and try to retry till it succeeds so I have given maxAttempts to -1
@Bean("errorHandler")
public ErrorHandler errorHandler() {
final ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
SeekToCurrentErrorHandler handler = new SeekToCurrentErrorHandler((r, t) -> {
if (t != null && (t instanceof RetryServiceException || t.getCause() instanceof RetryServiceException)) {
logger.error("SeekToCurrentErrorHandler recoverer failure", t.getMessage());
throw new RetryServiceException("SeekToCurrentErrorHandler recoverer failure");
}
}, -1);
return handler;
}

最后,当没有发生异常时,我会在 @KafkaListener 方法中进行确认。

我的问题是,如果我将 -1 配置为最大尝试次数,并且我的错误处理程序将负责重试,我是否需要 retryTemplate?但重试不会发生无限次,问题是如果我获取批处理记录,如果其中一条消息在轮询中失败,则我正在处理相同的消息,所有消息都将被重新处理。

我需要使用batchErrorHandler并实现指数退避策略,以便重试是有状态的,并且应该避免重新处理相同的成功消息。谁能帮忙解决上述问题。

我需要避免通过错误使用ma​​x.poll.interval.ms来重新平衡分区

最佳答案

您需要使用Stateful Retry避免重新平衡;但是,对于现代版本,您根本不需要监听器级别重试,因为您现在可以在错误处理程序级别执行重试并退出。

使用构造函数接受 BackOff 并删除容器重试模板。

/**
* Construct an instance with the provided recoverer which will be called after
* the backOff returns STOP for a topic/partition/offset.
* @param recoverer the recoverer; if null, the default (logging) recoverer is used.
* @param backOff the {@link BackOff}.
* @since 2.3
*/
public SeekToCurrentErrorHandler(@Nullable BiConsumer<ConsumerRecord<?, ?>, Exception> recoverer, BackOff backOff) {

I need to use batchErrorHandler and implement a exponential backoff strategy so that retry will be stateful and reprocessing of same success messages should be avoided. Could anyone help with the above issue.

框架无法帮助批处理监听器,因为它不知道失败发生在何处(在批处理中)。

关于java - 使用 Spring-Kafka-2.3.0 及更高版本使用主题中的消息并以指数退避重试,直到成功为止,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59819604/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com