gpt4 book ai didi

java - kafka @Listener异常处理-无法配置批量重试

转载 作者:搜寻专家 更新时间:2023-11-01 03:17:51 24 4
gpt4 key购买 nike

我是 Kafka 的新手。我试图弄清楚并理解错误场景如何为 @Listener batch consumer factory 工作。

我在做什么...

我在 batch 过程中使用来自 topic 的记录,并将它们插入到 DB 中,如下所示 ...

@KafkaListener( topics = "KAFKA.TEST")
public Boolean listen(List<ConsumerRecord<String, User>> list) throws Exception {
Boolean result = null;
List<User> userList = new ArrayList<>();
for (ConsumerRecord<String, User> record : list) {
User user = record.value();
userList.add(user);
}
if(userList.size()>0) {
result = dbService.insertBatchUser(userList);
LOGGER.info(" users inserted " + userList.size());
}
else
LOGGER.info(" No users found in the topic ");

countDownLatch.countDown();
return result;
}

我的问题

  1. 如果任何批处理由于数据库不可用而无法插入数据库,如何重试
  2. 如何测试 Kafka 服务器是否正在运行并能够连接到特定主题 - 为什么我问这个问题是因为我在停止 zookeeperKafka 服务器本地但没有错误或异常。 Kafka Producer 我的意思是 Template 在停止 Kafka Server 后发送消息抛出错误,但在 Listener 处未发现错误

已添加

我的配置

@Bean
public ConsumerFactory consumerFactory(){
return new DefaultKafkaConsumerFactory(consumerConfigs(),stringKeyDeserializer(),jsonValueDeserializer());
}
@Bean
public RetryPolicy getRetryPolicy(){
SimpleRetryPolicy simpleRetryPolicy = new SimpleRetryPolicy();
simpleRetryPolicy.setMaxAttempts(getMaxRetryAttempts());
return simpleRetryPolicy;
}

@Bean
public FixedBackOffPolicy getBackOffPolicy() {
FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
backOffPolicy.setBackOffPeriod(getRetryInterval());
return backOffPolicy;
}

@Bean
public RetryTemplate getRetryTemplate(){
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(getRetryPolicy());
retryTemplate.setBackOffPolicy(getBackOffPolicy());
return retryTemplate;
}

@Bean
public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory(){
ConcurrentKafkaListenerContainerFactory factory = new ConcurrentKafkaListenerContainerFactory();
factory.setConcurrency(getConcurrency());
factory.getContainerProperties().setPollTimeout(getPollTimeout());
factory.setConsumerFactory(consumerFactory());
factory.setBatchListener(true);
factory.setRetryTemplate(getRetryTemplate());
return factory;
}

这里我使用的是org.springframework.retry.support.RetryTemplate

异常我得到

java.lang.ClassCastException: org.springframework.kafka.listener.adapter.BatchMessagingMessageListenerAdapter cannot be cast to org.springframework.kafka.listener.MessageListener
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupMessageListener(AbstractKafkaListenerEndpoint.java:306) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.config.AbstractKafkaListenerEndpoint.setupListenerContainer(AbstractKafkaListenerEndpoint.java:282) ~[spring-kafka-1.1.2.RELEASE.jar:na]
at org.springframework.kafka.config.AbstractKafkaListenerContainerFactory.createListenerContainer(AbstractKafkaListenerContainerFactory.java:211) ~[spring-kafka-1.1.2.RELEASE.jar:na]

最佳答案

参见 Retrying Deliveries .

When using @KafkaListener, set the RetryTemplate (and optionally recoveryCallback) on the container factory and the listener will be wrapped in the appropriate retrying adapter.

新的 Kafka 客户端(0.9.x.x 或 0.10.x.x)不直接与 zookeeper 对话,只是与 kafka 服务器本身对话。

客户端在内部不断尝试重新连接;打开调试日志记录以查看 Activity 。

关于java - kafka @Listener异常处理-无法配置批量重试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42000710/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com