gpt4 book ai didi

spring-boot - Spring Kafka 和主题消费者数量

转载 作者:行者123 更新时间:2023-12-04 05:37:53 24 4
gpt4 key购买 nike

在我的 Spring Boot/Kafka 项目中,我有以下消费者配置:

@Configuration
public class KafkaConsumerConfig {

@Bean
public ConsumerFactory<String, String> consumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(String.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory(kafkaProperties));
factory.setConcurrency(10);
return factory;
}

@Bean
public ConsumerFactory<String, Post> postConsumerFactory(KafkaProperties kafkaProperties) {
return new DefaultKafkaConsumerFactory<>(kafkaProperties.buildConsumerProperties(), new StringDeserializer(), new JsonDeserializer<>(Post.class));
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, Post> postKafkaListenerContainerFactory(KafkaProperties kafkaProperties) {

ConcurrentKafkaListenerContainerFactory<String, Post> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(postConsumerFactory(kafkaProperties));

return factory;
}

}

这是我的 PostConsumer:

@Component
public class PostConsumer {

@Autowired
private PostService postService;

@KafkaListener(topics = "${kafka.topic.post.send}", containerFactory = "postKafkaListenerContainerFactory")
public void sendPost(ConsumerRecord<String, Post> consumerRecord) {

postService.sendPost(consumerRecord.value());

}

}

和 application.properties:

spring.kafka.bootstrap-servers=${kafka.host}:${kafka.port}
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.group-id=groupname
spring.kafka.consumer.enable-auto-commit=false
kafka.topic.post.send=post.send
kafka.topic.post.sent=post.sent
kafka.topic.post.error=post.error

如您所见,我添加了 factory.setConcurrency(10); 但它不起作用。所有 PostConsumer.sendPost 都在同一个名为 org.springframework.kafka.KafkaListenerEndpointContainer#1-8-C-1

的线程中执行

我希望能够控制并发 PostConsumer.sendPost 监听器的数量,以便并行工作。请告诉我如何使用 Spring Boot 和 Spring Kafka 实现它。

最佳答案

这里的问题在于我们使用 Apache Kafka Consumer 在 Spring Kafka 中追求的一致性。这种并发分布在提供的主题中的分区 之间。如果你只有一个主题和一个分区,那么确实不会有任何并发​​。重点是在同一线程中使用一个分区中的所有记录。

文档中有一些关于此事的信息:https://docs.spring.io/spring-kafka/docs/2.1.7.RELEASE/reference/html/_reference.html#_concurrentmessagelistenercontainer

If, say, 6 TopicPartition s are provided and the concurrency is 3; each container will get 2 partitions. For 5 TopicPartition s, 2 containers will get 2 partitions and the third will get 1. If the concurrency is greater than the number of TopicPartitions, the concurrency will be adjusted down such that each container will get one partition.

还有 JavaDocs:

/**
* The maximum number of concurrent {@link KafkaMessageListenerContainer}s running.
* Messages from within the same partition will be processed sequentially.
* @param concurrency the concurrency.
*/
public void setConcurrency(int concurrency) {

关于spring-boot - Spring Kafka 和主题消费者数量,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50895344/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com