gpt4 book ai didi

java - spring-kafka中多个KafkaListener实现共享逻辑

转载 作者:行者123 更新时间:2023-11-30 07:46:52 46 4
gpt4 key购买 nike

我的 Spring Boot 应用程序包含几个 @KafkaListeners ,并且每个监听器在实际处理有效载荷之前和之后执行相同的步骤:验证有效载荷,检查事件是否已经被处理,检查它是否是逻辑删除(空)消息,决定是否应该在失败的情况下重试处理,发出指标等。

这些步骤目前是在一个基类中实现的,但是由于传递给@KafkaListener的主题在运行时必须是常量,所以在子类中定义了@KafkaListener注解的方法,除了将其参数传递给基类中的方法。

这很好用,但我想知道是否有更优雅的解决方案。我假设我的基类必须以编程方式创建一个监听器容器,但在快速查看 KafkaListenerAnnotationBeanPostProcessor 之后, 它似乎很复杂。

有没有人有任何推荐?

最佳答案

在寻求实现类似的东西时偶然发现了这个问题,我首先从 Artem Bilan 的 answer 着手。 .但是,这不起作用,因为默认情况下,注释不会在子类中继承,除非它们本身使用 @Inherited 进行注释。尽管如此,可能仍然有一种方法可以使注释方法起作用,如果我让它起作用,我会更新这个答案。值得庆幸的是,尽管我已经使用 Kafka 监听器的程序注册实现了预期的行为。

我的代码是这样的:

接口(interface):

public interface GenericKafkaListener {

String METHOD = "handleMessage";

void handleMessage(ConsumerRecord<String, String> record);
}

抽象类:

public abstract class AbstractGenericKafkaListener implements GenericKafkaListener {

private final String kafkaTopic;

public AbstractGenericKafkaListener(final String kafkaTopic) {
this.kafakTopic = kafkaTopic;
}

@Override
public void handleMessage(final ConsumerRecord<String, String> record) {
//do common logic here
specificLogic(record);
}

protected abstract specificLogic(ConsumerRecord<String, String> record);

public String getKafkaTopic() {
return kafkaTopic;
}
}

然后我们可以通过编程方式在 KafkaListenerConfigurer 中注册所有类型为 AbstractGenericKafkaListener 的 bean:

@Configuration
public class KafkaListenerConfigurataion implements KafkaListenerConfigurer {

@Autowired
private final List<AbstractGenericKafkaListener> listeners;

@Autowired
private final BeanFactory beanFactory;

@Autowired
private final MessageHandlerMethodFactory messageHandlerMethodFactory;

@Autowired
private final KafkaListenerContainerFactory kafkaListenerContainerFactory;

@Value("${your.kafka.consumer.group-id}")
private String consumerGroup;

@Value("${your.application.name}")
private String service;

@Override
public void configureKafkaListeners(
final KafkaListenerEndpointRegistrar registrar) {

final Method listenerMethod = lookUpMethod();

listeners.forEach(listener -> {
registerListenerEndpoint(listener, listenerMethod, registrar);
});
}

private void registerListenerEndpoint(final AbstractGenericKafkaListener listener,
final Method listenerMethod,
final KafkaListenerEndpointRegistrar registrar) {

log.info("Registering {} endpoint on topic {}", listener.getClass(),
listener.getKafkaTopic());

final MethodKafkaListenerEndpoint<String, String> endpoint =
createListenerEndpoint(listener, listenerMethod);
registrar.registerEndpoint(endpoint);
}

private MethodKafkaListenerEndpoint<String, String> createListenerEndpoint(
final AbstractGenericKafkaListener listener, final Method listenerMethod) {

final MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setBeanFactory(beanFactory);
endpoint.setBean(listener);
endpoint.setMethod(listenerMethod);
endpoint.setId(service + "-" + listener.getKafkaTopic());
endpoint.setGroup(consumerGroup);
endpoint.setTopics(listener.getKafkaTopic());
endpoint.setMessageHandlerMethodFactory(messageHandlerMethodFactory);

return endpoint;
}

private Method lookUpMethod() {
return Arrays.stream(GenericKafkaListener.class.getMethods())
.filter(m -> m.getName().equals(GenericKafkaListener.METHOD))
.findAny()
.orElseThrow(() ->
new IllegalStateException("Could not find method " + GenericKafkaListener.METHOD));
}
}

关于java - spring-kafka中多个KafkaListener实现共享逻辑,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50325766/

46 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com