gpt4 book ai didi

spring-kafka - 寻找在 Spring Kafka 2.1.0 中使用自定义 ConsumerAwareRebalanceListener 的工作示例

转载 作者:行者123 更新时间:2023-12-01 05:57:24 27 4
gpt4 key购买 nike

根据文档:

ContainerProperties has a property consumerRebalanceListener which takes an implementation of the Kafka client’s ConsumerRebalanceListener interface. If this property is not provided, the container will configure a simple logging listener that logs rebalance events under the INFO level. The framework also adds a sub-interface ConsumerAwareRebalanceListener [...]



然而,似乎没有注入(inject)点或生命周期阶段来实际将自定义重新平衡监听器分配给 ContainerProperties。 .

在我的情况下,我使用自动配置,bean 主要由 KafkaAnnotationDrivenConfiguration 提供和 KafkaAutoConfiguration .

与许多 Spring 类和组件不同,无法通过定义特定类型的 bean 来设置自定义重新平衡监听器,唯一的选择似乎是对现有 bean 进行黑客攻击和子类化。

或者我在这里缺少什么?

最佳答案

ConcurrentKafkaListenerContainerFactoryConfigurer如需延期,请调用super.configure()并使用:

ContainerProperties containerProperties = listenerContainerFactory
.getContainerProperties();

访问 @KafkaListener 的容器属性.并且已经在那里你可以注入(inject)你的 RebalanceListener .

正确的:那个习惯 ConcurrentKafkaListenerContainerFactoryConfigurer必须声明为 @Bean :
@Bean
public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerFactoryConfigurer(
ConsumerAwareRebalanceListener rebalanceListener) {

return new ConcurrentKafkaListenerContainerFactoryConfigurer() {

@Override
public void configure(ConcurrentKafkaListenerContainerFactory<Object, Object> listenerContainerFactory,
ConsumerFactory<Object, Object> consumerFactory) {
super.configure(listenerContainerFactory, consumerFactory);
listenerContainerFactory.getContainerProperties()
.setConsumerRebalanceListener(rebalanceListener);
}

};

}

正是从 KafkaAnnotationDrivenConfiguration 中使用了这个 bean build kafkaListenerContainerFactory bean :
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {

关于spring-kafka - 寻找在 Spring Kafka 2.1.0 中使用自定义 ConsumerAwareRebalanceListener 的工作示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48533466/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com