gpt4 book ai didi

java - Spring Kafka 与嵌入式 Kafka 集成测试

转载 作者:行者123 更新时间:2023-11-30 01:55:13 25 4
gpt4 key购买 nike

我有一个 Spring Boot 应用程序,它有一个消费者从一个集群中的主题消费并生成到不同集群中的另一个主题。

现在我正在尝试使用 spring 嵌入式 Kafka 编写集成测试用例,但遇到问题 KafkaTemplate 无法注册。具有该名称的 bean 已在类路径资源中定义

消费阶层

@Service
public class KafkaConsumerService {

@Autowired
private KafkaProducerService kafkaProducerService;

@KafkaListener(topics = "${kafka.producer.topic}")
public void professor(List<Professor> pro) {
pro.forEach(kafkaProducerService::produce);

}

}

制作人等级

@Service
public class KafkaProducerService {

@Value("${kafka.producer.topic}")
private String topic;

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

public void produce(Professor pro) {
kafkaTemplate.send(topic,"professor",pro);
}

}

在我的测试用例中,我想重写KafkaTemplate,以便当我在Test中调用kafkaConsumerService.professor方法时,它应该将数据生成到嵌入式 Kafka,我应该验证它。

测试配置

@TestConfiguration
@EmbeddedKafka(partitions = 1, controlledShutdown = false,
brokerProperties = {"listeners=PLAINTEXT://localhost:3333", "port=3333"})
public class KafkaProducerConfigTest {

@Autowired
KafkaEmbedded kafkaEmbeded;

@Autowired
KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

@Before
public void setUp() throws Exception {
for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
ContainerTestUtils.waitForAssignment(messageListenerContainer,
kafkaEmbeded.getPartitionsPerTopic());
}
}

@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbeded));
}

@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
KafkaTemplate<String, Object> kafkaTemplate = new KafkaTemplate<>(producerFactory());
return kafkaTemplate;
}

}

测试类

@EnableKafka
@SpringBootTest(classes = {KafkaProducerConfigTest.class})
@RunWith(SpringRunner.class)
public class KafkaProducerServiceTest {

@Autowired
private KafkaConsumerService kafkaConsumerService;

@Test
public void testReceive() throws Exception {
kafkaConsumerService.professor(Arrays.asList(new Professor()));

//How to check messages is sent to kafka?
}

}

错误

 The bean 'kafkaTemplate', defined in com.kafka.configuration.KafkaProducerConfigTest, could not be registered. 
A bean with that name has already been defined in class path resource [com/kafka/configuration/KafkaProducerConfig.class] and overriding is disabled.
Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

有人可以帮助我如何验证发送到嵌入式 Kafka 服务器的消息吗?

注意我收到了一些已弃用的警告

The type KafkaEmbedded is deprecated

The method getPartitionsPerTopic() from the type KafkaEmbedded is deprecated

The method producerProps(KafkaEmbedded) from the type KafkaTestUtils is deprecated

最佳答案

启动2.1 disables bean overriding by default .

Bean overriding has been disabled by default to prevent a bean being accidentally overridden. If you are relying on overriding, you will need to set spring.main.allow-bean-definition-overriding to true.

关于弃用;请参阅 @EmbeddedKafka 的 javadocs。它被 EmbeddedKafkaBroker 取代。

关于java - Spring Kafka 与嵌入式 Kafka 集成测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54754662/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com