gpt4 book ai didi

spring - @KafkaListener 为每个监听器单独过滤逻辑

转载 作者:行者123 更新时间:2023-12-04 10:22:45 28 4
gpt4 key购买 nike

我需要为监听器工厂生成的每个监听器定义自定义过滤策略。
目前,我正在使用 RecordFilterStrategy要做到这一点:

@Bean
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> kafkaListenerContainerFactoryProject() {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setRecordFilterStrategy(new RecordFilterStrategy<String, GenericRecord>() {
@Override
public boolean filter(ConsumerRecord<String, GenericRecord> consumerRecord) {
return true;
}
});
return factory;
}

但是这种过滤适用于该工厂生产的所有监听器。我需要的是为每个监听器定义不同的逻辑:
@Component
@SendTo("out")
@KafkaListener(topics = "incoming")
public class TestListener {

@Filter
public boolean filter(){
return true;
}

@KafkaHandler
public TestObject listener(TestObject testObject) {
log.debug("Received Message: " + testObject);
return testObject;
}

}

spring-kafka 是否有一些工具可以做到这一点?或者我需要自己编写这样的逻辑?

提前致谢!

最佳答案

不,你没有。您只需要一套ConcurrentKafkaListenerContainerFactory bean 类特别RecordFilterStrategy .那么你的 @KafkaListener应该只指定它们基于哪个工厂:

/**
* The bean name of the {@link org.springframework.kafka.config.KafkaListenerContainerFactory}
* to use to create the message listener container responsible to serve this endpoint.
* <p>If not specified, the default container factory is used, if any.
* @return the container factory bean name.
*/
String containerFactory() default "";

关于spring - @KafkaListener 为每个监听器单独过滤逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60770380/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com