gpt4 book ai didi

java - 使用 spring kafka 消费者恢复 kafka 稳定组

转载 作者:行者123 更新时间:2023-12-01 19:54:10 25 4
gpt4 key购买 nike

我的消费者速度相当慢,可能需要 5 分钟以上才能处理记录。我想避免的是 kafka 重新稳定了团队。根据我的理解,为了做到这一点,我必须为 kafka 代理设置以下属性:

  group.max.session.timeout.ms = 3600001 
group.min.session.timeout.ms = 3600000

在我的应用程序端,我有以下配置:

    @Bean
public Map<String, Object> consumerConfigs() {
final Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
environment.getProperty("app.kafkaBrokers"));
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, Integer.valueOf(environment.getProperty("app.session.timeout.ms")) );
propsMap.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, Integer.valueOf(environment.getProperty("app.session.timeout.ms")) + 1 );
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
return propsMap;
}

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
final ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(9);// was 3
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}

在我的听众中我也有:

 @KafkaListener(id = "baz", topics = "tipJobsForExecution", containerFactory="kafkaListenerContainerFactory")
public void listen(ConsumerRecord<?, ?> record)

我的监听器大约需要 5 分钟来处理消息。完成后,我在 kafka 代理端阅读了以下内容:

2018-05-03 10:29:11,210] INFO [GroupCoordinator 0]: Preparing to rebalance group baz with old generation 22 (__consumer_offsets-7) (kafka.coordinator.group.GroupCoordinator)

据我了解,kafka 认为消费者已经死亡并重新平衡该组。我的问题是为什么会发生这种情况?我的一个想法是,也许心跳不是应有的每 3000 毫秒心跳一次,但我不知道如何解决这个问题。

提前致谢,扬尼斯

最佳答案

您必须了解 Kafka Consumer 的三种类型的超时配置参数。

heartbeat.interval.ms - 使用 Kafka 的组管理工具时向消费者协调器发出心跳的预期时间。通常应为 session.timeout 值的 1/3 默认值 - 3000 ms

session.timeout.ms - 如果在此 session 超时到期之前代理没有收到心跳,则代理将从组中删除此消费者并启动重新平衡。默认值 10000

ma​​x.poll.interval.ms - 如果在此超时到期之前未调用 poll(),则消费者被视为失败,组将重新平衡默认值 - 300000

在您的情况下,轮询间隔设置的值似乎太低。

引用 - https://kafka.apache.org/documentation/#newconsumerconfigs

关于java - 使用 spring kafka 消费者恢复 kafka 稳定组,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50149482/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com