gpt4 book ai didi

java - 在 Consumer 之间均匀分布 Kafka 分区

转载 作者:行者123 更新时间:2023-12-01 17:16:48 25 4
gpt4 key购买 nike

我有一个包含 300 个分区的主题,并且有 100 个消费者/机器。我使用 Spring Kafka 作为底层框架来实现 Kafka Consumers。

我使用的是ConcurrentKafkaListenerContainerFactory,并发数设置为3,所以理论上我应该有300个Consumer Container,一个分区应该连接到一个容器,这样分区就均匀分布在100台机器上。

For the first constructor, kafka will distribute the partitions across the consumers. For the second constructor, the ConcurrentMessageListenerContainer distributes the TopicPartition s across the delegate KafkaMessageListenerContainer s.

If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

但我没有看到上述行为,我看到一些容器/机器处于空闲状态,而其他容器/机器连接到 6 个分区,这导致 Kafka 主题出现滞后。

我在这里做错了什么吗?如何确保分区在容器之间均匀映射,并且没有容器映射到多个分区?请帮忙。

key.deserializer : StringDeserializer
value.deserializer : [CUSTOM DESERIALIZER]
enable.auto.commit : false
max.poll.records : 5
group.id : [MY GROUP]
partition.assignment.strategy : StickyAssignor
max.partition.fetch.bytes : 1048576
bootstrap.servers : [SERVERS]
auto.commit.interval.ms : 3000
auto.offset.reset : latest


factory.setConcurrency(3);

@KafkaListener(topics = "#{kafkaTopicConfig.getStoreSupply()}", containerFactory = EI_LISTNER_FACTORY)

EI_LISTNER_FACTORY 是一个 Bean..

@Bean(EI_LISTNER_FACTORY)
public ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> eiKafkaListenerContainerFactory() {

Boolean eiCnsumerStartup = [START_UP From Configuration]

Integer concurrentThreadCount = 3;

Map<String, Object> config = [properties from ABOVE]
ConcurrentKafkaListenerContainerFactory<String, AggQuantityByPrimeValue> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(config));
factory.setAutoStartup(eiConsumerStartup);

if (config.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).equals("false")) {
factory.getContainerProperties().setAckMode(AckMode.MANUAL_IMMEDIATE);
factory.setConcurrency(concurrentThreadCount);
}
return factory;

}

最佳答案

配置看起来不错。也许当您描述消费者群体时,很少有消费者变得无法访问/闲置。因此,重新平衡会导致将相同的服务器容器线程分配给多个分区。

如果不是这种情况,请启用 kafka 级别日志来监视分区分配和撤销日志,以检查重新平衡是否触发了所需的结果。

关于java - 在 Consumer 之间均匀分布 Kafka 分区,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61372299/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com