gpt4 book ai didi

Spring Kafka 监听器执行器

转载 作者:行者123 更新时间:2023-12-04 17:50:23 25 4
gpt4 key购买 nike

我正在 spring boot 应用程序中设置一个 kafka 监听器,但我似乎无法使用执行程序让监听器在池中运行。这是我的卡夫卡配置:

@Bean
ThreadPoolTaskExecutor messageProcessorExecutor() {
logger.info("Creating a message processor pool with {} threads", numThreads);
ThreadPoolTaskExecutor exec = new ThreadPoolTaskExecutor();
exec.setCorePoolSize(200);
exec.setMaxPoolSize(200);
exec.setKeepAliveSeconds(30);
exec.setAllowCoreThreadTimeOut(true);
exec.setQueueCapacity(0); // Yields a SynchronousQueue
exec.setThreadFactory(ThreadFactoryFactory.defaultNamingFactory("kafka", "processor"));
return exec;
}

@Bean
public ConsumerFactory<String, PollerJob> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroup);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
DefaultKafkaConsumerFactory<String, PollerJob> factory = new DefaultKafkaConsumerFactory<>(props,
new StringDeserializer(),
new JsonDeserializer<>(PollerJob.class));
return factory;
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, PollerJob> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, PollerJob> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(Integer.valueOf(kafkaThreads));
factory.getContainerProperties().setListenerTaskExecutor(messageProcessorExecutor());
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL);
return factory;
}

ThreadFactoryFactoryThreadPoolTaskExecutor 使用只需确保线程的名称类似于 'kafka-1-processor-1' .

ConsumerFactoryENABLE_AUTO_COMMIT_CONFIG标志设置为 false,我使用手动模式进行确认,这是根据 documentation 使用执行程序所必需的.

我的听众看起来像这样:

@KafkaListener(topics = "my_topic",
group = "my_group",
containerFactory = "kafkaListenerContainerFactory")
public void listen(@Payload SomeJob job, Acknowledgment ack) {
ack.acknowledge();
logger.info("Running job {}", job.getId());
....
}

使用管理服务器,我可以检查所有线程,但只有一个 kafka-N-processor-N正在创建线程,但我预计最多会看到 200 个。所有作业都在那个线程上一次运行一个,我不明白为什么。

我怎样才能得到这个设置来使用我的执行器和尽可能多的线程来运行监听器?

我正在使用 Spring Boot 1.5.4.RELEASE 和 kafka 0.11.0.0。

最佳答案

如果您的主题只有一个分区,根据消费者组策略,只有一个消费者能够轮询该分区。

ConcurrentMessageListenerContainer 确实创建了与所提供的concurrency 一样多的目标KafkaMessageListenerContainer 实例。只有在不知道主题中的分区数量的情况下,它才会这样做。

当消费者组重新平衡时,只有一个消费者获得分区进行消费。所有的工作实际上都是在一个线程中完成的:

 private void startInvoker() {
ListenerConsumer.this.invoker = new ListenerInvoker();
ListenerConsumer.this.listenerInvokerFuture = this.containerProperties.getListenerTaskExecutor()
.submit(ListenerConsumer.this.invoker);
}

一个分区 - 一个线程用于顺序记录处理。

关于Spring Kafka 监听器执行器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45487952/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com