gpt4 book ai didi

java - Spring Kafka Embedded - 测试之间已经存在主题

转载 作者:行者123 更新时间:2023-12-02 09:05:10 24 4
gpt4 key购买 nike

我创建了一组带有嵌入式kafka(spring-kafka-test)的测试(JUnit 5),当我有时(并非总是)运行它们时,我得到“主题'some_name'已经存在 “在单次运行中进行一项或多项测试。

所有测试都使用相同的主题名称(我不想为每个测试更改该名称),测试类具有 DirtiesContext 注释(AFTER_EACH_TEST_METHOD)。我不确定这个问题的原因是什么以及如何解决。

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
@ActiveProfiles("test")
public class RemovalKafkaTestIT {
private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);
private final static String SERVER_ADDRES = "127.0.0.1:9092";

private Consumer<String, String> prepareConsumer() {
Map<String, Object> configsConsumer = new HashMap<>(KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
configsConsumer.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configsConsumer.put("bootstrap.servers", SERVER_ADDRES);
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(configsConsumer, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(singleton("some_name"));
return consumer;
}

@Test
public void someMethodWithKafka1() {
// some logic
...
// check topic content
Consumer<String, String> consumer = this.prepareConsumer();
embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");

ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
assertThat(records.count()).isEqualTo(1); // and other checks :)

// clean
consumer.commitSync();
consumer.close();
}

@Test
public void someMethodWithKafka2() {
// some other logic
...
// check topic content
Consumer<String, String> consumer = this.prepareConsumer();
embeddedKafkaBroker.consumeFromEmbeddedTopics(consumer, "some_name");

ConsumerRecords<String, String> records = KafkaTestUtils.getRecords(consumer);
assertThat(records.count()).isEqualTo(1); // and other checks :)

// clean
consumer.commitSync();
consumer.close();
}
}

最佳答案

您有两名经纪人;您自己创建的一个:

private EmbeddedKafkaBroker embeddedKafkaBroker = new EmbeddedKafkaBroker(1, true, TOPIC);

还有一个由 Spring 管理:

@SpringBootTest
@EmbeddedKafka(partitions = 1, topics = {"some_name"}, ports = 9092)

当您在 Spring 测试上下文中使用 @EmbeddedKafka 时;代理已添加到上下文中。

更改为

@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;

并且不要添加其他 bean。

通常,为每个测试使用不同的主题会更容易(也更快);避免为每个测试创建一个代理。

编辑

ports = 9092

使用随机端口(省略此配置)并使用

configsConsumer.put("bootstrap.servers", this.embeddedKafkaBroker.getBrokersAsString());

关于java - Spring Kafka Embedded - 测试之间已经存在主题,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59877538/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com