gpt4 book ai didi

java - 每个主题是否可以有一个 Kafka 消费者线程?

转载 作者:行者123 更新时间:2023-11-30 06:01:58 32 4
gpt4 key购买 nike

我们有使用 Spring-Kafka (2.1.7) 的 Springboot 应用程序。我们启用了并发性,因此每个分区可以有一个消费者线程。所以目前,如果我们有 3 个主题,每个主题有 2 个分区,将有 2 个消费者线程,如下所示:

ConsumerThread1 - [topic1-0, topic2-0, topic3-0]
ConsumerThread2 - [topic1-1, topic2-1, topic3-1]

但是,我们希望每个主题有一个消费者线程,而不是每个分区一个 KafkaListener(或消费者线程)。例如:

ConsumerThread1 - [topic1-0, topic1-1]
ConsumerThread2 - [topic2-0, topic2-1]
ConsumerThread3 - [topic3-0, topic3-1]

如果那是不可能的,即使是下面的设置也可以:

ConsumerThread1 - [topic1-0]
ConsumerThread2 - [topic1-1]
ConsumerThread3 - [topic2-0]
ConsumerThread4 - [topic2-1]
ConsumerThread5 - [topic3-0]
ConsumerThread6 - [topic3-1]

要注意的是,我们事先并不知道完整的主题列表(我们使用的是通配符主题模式)。可以随时添加新主题,并且应该在运行时为这个新主题动态创建一个(或多个)新消费者线程。

有什么办法可以实现吗?

最佳答案

您可以为来自 spring-kafka:2.2 的每个主题创建单独的容器并将并发设置为 1,以便每个容器都从每个主题消费

Starting with version 2.2, you can use the same factory to create any ConcurrentMessageListenerContainer. This might be useful if you want to create several containers with similar properties or you wish to use some externally configured factory, such as the one provided by Spring Boot auto-configuration. Once the container is created, you can further modify its properties, many of which are set by using container.getContainerProperties(). The following example configures a ConcurrentMessageListenerContainer:

@Bean
public ConcurrentMessageListenerContainer<String, String>(
ConcurrentKafkaListenerContainerFactory<String, String> factory) {

ConcurrentMessageListenerContainer<String, String> container =
factory.createContainer("topic1", "topic2");
container.setMessageListener(m -> { ... } );
return container;
}

注意:以这种方式创建的容器不会添加到端点注册表中。它们应该被创建为 @Bean 定义,以便它们在应用程序上下文中注册。

关于java - 每个主题是否可以有一个 Kafka 消费者线程?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56264681/

32 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com