gpt4 book ai didi

java - 如何使用spring boot设置kafka消费者并发

转载 作者:行者123 更新时间:2023-12-02 08:13:16 26 4
gpt4 key购买 nike

我正在编写一个基于 Java 的 Kafka Consumer 应用程序。我正在为我的应用程序使用 kafka-clients、Spring Kafka 和 Spring boot。虽然 Spring boot 让我可以轻松编写 Kafka Consumers(无需真正编写 ConcurrentKafkaListenerContainerFactory、ConsumerFactory 等),但我希望能够为这些消费者定义/自定义一些属性。但是,我找不到使用 Spring boot 的简单方法。例如:我有兴趣设置的一些属性是 -

ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG

我查看了 Spring Boot 预定义属性 here .

此外,基于之前的问题 here ,我想在消费者上设置并发,但找不到配置、application.properties 驱动的方法来使用 Spring Boot 来实现这一点。

一个明显的方法是定义 ConcurrentKafkaListenerContainerFactory, ConsumerFactory在我的 Spring Context 中再次上课并从那里开始工作。我想了解是否有更简洁的方法来做到这一点,特别是因为我正在使用 Spring Boot。

版本 -

  • kafka-clients - 0.10.0.0-SASL
  • spring-kafka - 1.1.0.RELEASE
  • Spring Boot - 1.5.10.RELEASE

最佳答案

在您引用的网址处,向下滚动至

spring.kafka.listener.concurrency= # Number of threads to run in the listener containers.

spring-kafka - 1.1.0.RELEASE

我建议至少升级到1.3.5;由于 KIP-62,它有一个更简单的线程模型。

编辑

使用 Boot 2.0,您可以设置任意生产者、消费者、管理员、通用属性,如 in the boot documentation 中所述。 .

spring.kafka.consumer.properties.heartbeat.interval.ms

在 Boot 1.5 中,只有 spring.kafka.properties,如所述 here .

这会设置生产者和消费者的属性,但您可能会在日志中看到一些有关生产者未使用/不支持的属性的噪音。

或者,您可以简单地覆盖 Boot 的消费者工厂并根据需要添加属性...

@Bean
public ConsumerFactory<?, ?> kafkaConsumerFactory(KafkaProperties properties) {
Map<String, Object> consumerProps = properties.buildConsumerProperties();
consumerProps.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 5_000);
return new DefaultKafkaConsumerFactory<Object, Object>(consumerProps);
}

关于java - 如何使用spring boot设置kafka消费者并发,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50263755/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com