gpt4 book ai didi

java - 使用 Spring Kafka 批处理消费者进行非阻塞重试

转载 作者:行者123 更新时间:2023-12-05 02:34:57 32 4
gpt4 key购买 nike

我正在使用 spring-kafka 2.8.0 并且我正在尝试实现 non-blocking retries对于batch kafka consumer .这是我的配置和消费者:

@Configuration
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>>
batchListenerFactory(ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
return factory;
}
}
@Component
public class MyConsumer {

@KafkaListener(
topics = "my-topic",
containerFactory = "batchListenerFactory"
)
@RetryableTopic(
backoff = @Backoff(delay = 1000, multiplier = 2.0),
attempts = "4",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE,
autoCreateTopics = "false"
)
public void consume(List<ConsumerRecord<String, GenericRecord>> messages) {
// do some stuff
}

}

但是在 sturtup 上我得到了以下异常:java.lang.IllegalArgumentException:提供的类 BatchMessagingMessageListenerAdapter 不可从 AcknowledgingConsumerAwareMessageListener 分配

我的问题是:

  1. 有什么方法可以将批量消费者与@RetryableTopic结合起来吗?

  2. 还有其他方法可以为批处理消费者实现非阻塞重试吗?是否可以为此目的使用 RetryTemplate

最佳答案

@RetryableTopic 不支持批量监听器。

RecoveringBatchErrorHandler(DefaultErrorHandler 用于 2.8 及更高版本)支持在监听器抛出 BatchListenerFailedException 表示失败的记录。

然后您必须针对该主题实现自己的监听器。

关于java - 使用 Spring Kafka 批处理消费者进行非阻塞重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70663101/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com