gpt4 book ai didi

java - ConcurrentMessageListenerContainer 在分区存在时减少并发计数

转载 作者:行者123 更新时间:2023-12-02 09:14:31 25 4
gpt4 key购买 nike

我有一个 Spring Boot 应用程序,用于向 kafka 生成消息。该应用程序在 6 个实例上每天在高流量下运行 1000 万个请求。我也有一个 spring boot kafka 消费者应用程序。但该应用程序有 2 个实例,这些实例无法消耗所有消息,因为该应用程序正在运行单线程。我的主题有 4 个分区,我想根据分区计数执行消费者应用程序多线程。但我不确定我的代码是否有效。

Spring Kafka maven 依赖

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.1.7.RELEASE</version>
</dependency>

配置类

@Configuration
@EnableKafka
public class KafkaListenerConfig {

private final KafkaListenerProperties kafkaListenerProperties;

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(4);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(kafkaConsumerProps());
}
@Bean
public Map<String, Object> kafkaConsumerProps() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrap());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroup());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}

}

根据我的主题的分区计数,我将并发字段设置为 4

factory.setConcurrency(4);

主题分区:

        Topic:fraudSSCALogs     PartitionCount:4        ReplicationFactor:1     Configs:
Topic: fraudSSCALogs Partition: 0 Leader: 62 Replicas: 62 Isr: 62
Topic: fraudSSCALogs Partition: 1 Leader: 166 Replicas: 166 Isr: 166
Topic: fraudSSCALogs Partition: 2 Leader: 270 Replicas: 270 Isr: 270
Topic: fraudSSCALogs Partition: 3 Leader: 167 Replicas: 167 Isr: 167

监听类

public class SSCAReqResLogListener {

@KafkaListener(id= "consumer0",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"0"})})
public void receive1(ConsumerRecord<String, String> record) {
proceed(record);
}

@KafkaListener(id= "consumer1",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"1"})})
public void receive2(ConsumerRecord<String, String> record) {
proceed(record);
}

@KafkaListener(id= "consumer2",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"2"})})
public void receive3(ConsumerRecord<String, String> record) {
proceed(record);
}

@KafkaListener(id= "consumer3",topicPartitions = {@TopicPartition(topic="${kafka.consumer.topic}", partitions = {"3"})})
public void receive4(ConsumerRecord<String, String> record) {
proceed(record);
}

private synchronized void proceed(ConsumerRecord<String, String> record) {
log.info("Message Received: Topic: {}, Partition: {}, Value: {}", record.topic(), record.partition(), record.value());
}

}

当我的应用程序运行时,sp​​ring会发出警告;

WARN  [main] org.springframework.kafka.listener.ConcurrentMessageListenerContainer: When specific partitions are provided, the concurrency must be less than or equal to the number of partitions; reduced from 4 to 1

我的配置有什么问题?我可以做什么来解决这个警告?

最佳答案

当您显式定义分区时,并发只能设置为小于或等于分区数的数字。

您只为每个主题定义了一个分区,因此并发数只能设置为1。代码健全性会检查这一点并自动降低并发级别并向您发出警告。

@KafkaListener(id= "consumer0",topicPartitions = {@TopicPartition(
topic="${kafka.consumer.topic}",
partitions = {"0"} // <-- Only one partition
)})

多个分区的配置如下:

@KafkaListener(id= "consumer0",topicPartitions = {@TopicPartition(
topic="${kafka.consumer.topic}",
partitions = {"0", "1", "2", "3" }
)})

关于java - ConcurrentMessageListenerContainer 在分区存在时减少并发计数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59105911/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com