gpt4 book ai didi

java - 什么时候使用 ConcurrentKafkaListenerContainerFactory?

转载 作者:行者123 更新时间:2023-11-30 07:42:16 25 4
gpt4 key购买 nike

我是 kafka 的新手,我经历了 documentation但我什么也听不懂。有人可以解释一下何时使用 ConcurrentKafkaListenerContainerFactory 类吗?我使用了 Kafkaconsumer 类,但我看到 ConcurrentKafkaListenerContainerFactory 在我当前的项目中使用。请解释它的用途。

最佳答案

Kafka 消费者不是线程安全的。所有网络 I/O 都发生在进行调用的应用程序的线程中。确保多线程访问正确同步是用户的责任。非同步访问将导致 ConcurrentModificationException。

If a consumer is assigned multiple partitions to fetch data from, it will try to consume from all of them at the same time, effectively giving these partitions the same priority for consumption. However in some cases consumers may want to first focus on fetching from some subset of the assigned partitions at full speed, and only start fetching other partitions when these partitions have few or no data to consume.

Spring-kafka

ConcurrentKafkaListenerContainerFactory 用于为带注释的方法创建容器 @KafkaListener

spring kafka中有两个MessageListenerContainer

KafkaMessageListenerContainer
ConcurrentMessageListenerContainer

The KafkaMessageListenerContainer receives all message from all topics or partitions on a single thread. The ConcurrentMessageListenerContainer delegates to one or more KafkaMessageListenerContainer instances to provide multi-threaded consumption.

使用 ConcurrentMessageListenerContainer

@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}

它具有并发性。例如,container.setConcurrency(3) 创建三个 KafkaMessageListenerContainer 实例。

If you have six TopicPartition instances are provided and the concurrency is 3; each container gets two partitions. For five TopicPartition instances, two containers get two partitions, and the third gets one. If the concurrency is greater than the number of TopicPartitions, the concurrency is adjusted down such that each container gets one partition.

这是带有文档的清晰示例 here

关于java - 什么时候使用 ConcurrentKafkaListenerContainerFactory?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55023240/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com