gpt4 book ai didi

java - 如何在 Spring Kafka的消费者消费之前过滤Kafka消息

转载 作者:塔克拉玛干 更新时间:2023-11-01 22:20:13 25 4
gpt4 key购买 nike

我在我的项目中使用 spring Kafka,我想在消费者基于键和值消费之前过滤消息。

这可能吗?

最佳答案

是的,在 Spring Kafka 中,您可以在消费者消费之前过滤消息,有一个接口(interface) public interface RecordFilterStrategy<K,V>和该接口(interface)中的方法 boolean filter(org.apache.kafka.clients.consumer.ConsumerRecord<K,V> consumerRecord)

所以你需要覆盖这个filter方法,如果返回false则消费者消费该消息,否则返回true则不消费该消息(会被过滤掉)

您可以在消息键、值或 header 上应用此过滤

consumerRecord.key() // will return key of message
consumerRecord.value() // will return the message
consumerRecord.headers() // will return the headers

示例代码:

 @Bean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConcurrency(Integer.parseInt(threads));
factory.setBatchListener(true);
factory.setConsumerFactory(kafkaConsumerFactory());
factory.getContainerProperties().setPollTimeout(Long.parseLong(pollTimeout));
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.BATCH);

if(true) {
factory.setRecordFilterStrategy(new RecordFilterStrategy<String, String>() {

@Override
public boolean filter(ConsumerRecord<String, String> consumerRecord) {
if(consumerRecord.key().equals("ETEST")) {
return false;
}
else {
return true;
}
}
});
}

return factory;
}

关于java - 如何在 Spring Kafka的消费者消费之前过滤Kafka消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51510653/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com