gpt4 book ai didi

java - 如何将 RecordInterceptor 设置为 ConcurrentKafkaListenerContainerFactory

转载 作者:行者123 更新时间:2023-11-30 09:59:49 25 4
gpt4 key购买 nike

我正在使用 Spring Kafka 2.2.7,我已经使用 kafkaListenerContainerFactory 配置了 @EnableKafka 并使用 @KafkaListener 来消费消息,一切正在按预期工作。

我想添加一个 RecordInterceptor 来记录所有消费的消息,但发现很难配置它。 documentation指出可以在容器上设置 RecordInterceptor,但是我不确定如何获取容器的实例。

Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record.

    @Bean
public ConcurrentKafkaListenerContainerFactory<String, Bytes> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Bytes> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(createConsumerFactory());
factory.setConcurrency(consumerCount);
return factory;
}

我查看了 Spring 文档,但没有找到解决方案,这似乎是一件简单的事情,但也许我错过了一些东西。

如有任何帮助,我们将不胜感激。

提前致谢。

最佳答案

有一个方法setRecordInterceptor2.2.7

factory.setRecordInterceptor(new RecordInterceptor);

还有另一个信息RecordInterceptor不适用于批处理监听器

Starting with version 2.2.7, you can add a RecordInterceptor to the listener container; it will be invoked before calling the listener allowing inspection or modification of the record. If the interceptor returns null, the listener is not called. The interceptor is not invoked when the listener is a batch listener.

关于java - 如何将 RecordInterceptor 设置为 ConcurrentKafkaListenerContainerFactory,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58584773/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com