gpt4 book ai didi

spring - Kafka 监听器中的钩子(Hook)

转载 作者:行者123 更新时间:2023-12-04 01:35:05 34 4
gpt4 key购买 nike

kafka 收听消息之前/之后是否有可用的 Hook ?

用例:必须设置 MDC 关联 id 才能执行日志可追溯性

我在寻找什么?一个之前/之后的回调方法,以便可以在进入时设置 MDC 关联 ID,并最终在退出时清理 MDC。

编辑场景:我正在获取关联 ID 作为 Kafka header 的一部分,我想在 Kafka Listener 中收到消息后立即在 MDC 中设置相同的 ID

感谢帮助

最佳答案

您可以向监听器 bean 添加环绕建议...

@SpringBootApplication
public class So59854374Application {

public static void main(String[] args) {
SpringApplication.run(So59854374Application.class, args);
}

@Bean
public static BeanPostProcessor bpp() { // static is important
return new BeanPostProcessor() {

@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof MyListener) {
ProxyFactoryBean pfb = new ProxyFactoryBean();
pfb.setTarget(bean);
pfb.addAdvice(new MethodInterceptor() {

@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
try {
System.out.println("Before");
return invocation.proceed();
}
finally {
System.out.println("After");
}
}

});
return pfb.getObject();
}
return bean;
}

};
}

@Bean
public NewTopic topic() {
return TopicBuilder.name("so59854374").partitions(1).replicas(1).build();
}

@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so59854374", "foo");
}

}

@Component
class MyListener {

@KafkaListener(id = "so59854374", topics = "so59854374")
public void listen(String in) {
System.out.println(in);
}

}

Before
foo
After

编辑

如果您将 @Header("myMdcHeader") byte[] mdc 作为附加参数添加到您的 kafka 监听器方法中,则可以使用 getArguments()[1]关于调用。

另一种解决方案是向监听器容器工厂添加一个 RecordInterceptor,它允许您在将原始 ConsumerRecord 传递给监听器适配器之前访问它。

/**
* An interceptor for {@link ConsumerRecord} invoked by the listener
* container before invoking the listener.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
* @since 2.2.7
*
*/
@FunctionalInterface
public interface RecordInterceptor<K, V> {

/**
* Perform some action on the record or return a different one.
* If null is returned the record will be skipped.
* @param record the record.
* @return the record or null.
*/
@Nullable
ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);

}
/**
* Set an interceptor to be called before calling the listener.
* Does not apply to batch listeners.
* @param recordInterceptor the interceptor.
* @since 2.2.7
*/
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}

如果您使用的是批处理监听器,Kafka 会提供一个ConsumerInterceptor

关于spring - Kafka 监听器中的钩子(Hook),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59854374/

34 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com