gpt4 book ai didi

java - 如何为@KafkaListener 编写单元测试?

转载 作者:行者123 更新时间:2023-12-04 05:31:31 24 4
gpt4 key购买 nike

试图弄清楚我是否可以使用 spring-kafka 和 spring-kafka-test 为 @KafkaListener 编写单元测试。

我的听众课。

    public class MyKafkaListener {
@Autowired
private MyMessageProcessor myMessageProcessor;

@KafkaListener(topics = "${kafka.topic.01}", groupId = "SF.CLIENT", clientIdPrefix = "SF.01", containerFactory = "myMessageListenerContainerFactory")
public void myMessageListener(MyMessage message) {
myMessageProcessor.process(message);
log.info("MyMessage processed");
}}

我的测试课:

    @RunWith(SpringRunner.class)
@DirtiesContext
@EmbeddedKafka(partitions = 1, topics = {"I1.Topic.json.001"})
@ContextConfiguration(classes = {TestKafkaConfig.class})
public class MyMessageConsumersTest {

@Autowired
private MyMessageProcessor myMessageProcessor;

@Value("${kafka.topic.01}")
private String TOPIC_01;

@Autowired
private KafkaTemplate<String, MyMessage> messageProducer;

@Test
public void testSalesforceMessageListner() {
MyMessageConsumers myMessageConsumers = new MyMessageConsumers(mockService);
messageProducer.send(TOPIC_01, "MessageID", new MyMessage());
verify(myMessageProcessor, times(1)).process(any(MyMessage.class));
}}

我的测试配置类:

    @Configuration
@EnableKafka
public class TestKafkaConfig {
@Bean
public MyMessageProcessor myMessageProcessor() {
return mock(MyMessageProcessor.class);
}
@Bean
public KafkaEmbedded kafkaEmbedded() {
return new KafkaEmbedded(1, true, 1, "I1.Topic.json.001");
}

//Consumer
@Bean
public ConsumerFactory<String, MyMessage> myMessageConsumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(), new JsonDeserializer<>(MyMessage.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, MyMessage> myMessageListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, MyMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(myMessageConsumerFactory());
return factory;
}

//Producer
@Bean
public ProducerFactory<String, MyMessage> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaEmbedded().getBrokersAsString());
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaMessageSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, MyMessage> messageProducer() {
return new KafkaTemplate<>(producerFactory());
}
}


有什么简单的方法可以完成这项工作吗?

或者我应该以其他方式对@KafkaListener 进行测试?在单元测试中,如何确保在新消息到达 Kafka 时调用 @KafkaListener。

最佳答案

how do I ensure @KafkaListener is invoked when a new message is arrived in Kafka.



好吧,这本质上是框架的责任来测试这样的功能。在您的情况下,您只需要专注于业务逻辑和单元测试您的自定义代码,而不是在框架中编译的代码。另外没有goo点测试 @KafkaListener仅记录传入消息的方法。找到测试用例验证的钩子(Hook)肯定太难了。

另一方面,我真的相信您的 @KafkaListener 中的业务逻辑方法比你展示的要复杂得多。因此,最好验证从该方法调用的自定义代码(例如 DB 插入、其他一些服务调用等),而不是尝试准确找出 myMessageListener() 的 Hook 。 .

您如何使用 mock(MyMessageProcessor.class)确实是业务逻辑验证的好方法。只有代码中的错误在于 EmbeddedKafka 的重复。 : 你使用了一个注解并且你还声明了一个 @Bean在配置中。您应该考虑删除其中一个。尽管尚不清楚您的生产代码在哪里,但它确实不受嵌入式 Kafka 的影响。否则,如果一切都在测试范围内,我看不出您的消费者和生产者工厂配置有任何问题。您肯定对 @KafkaListener 有一个最小的可能配置和 KafkaTemplate .您只需要删除 @EmbeddedKafka不要启动代理两次。

关于java - 如何为@KafkaListener 编写单元测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52783066/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com