gpt4 book ai didi

spring-boot - 带有 Spring Boot 的简单嵌入式 Kafka 测试示例

转载 作者:行者123 更新时间:2023-12-03 11:21:28 26 4
gpt4 key购买 nike

编辑仅供引用:working gitHub example

我在互联网上搜索,找不到嵌入式 Kafka 测试的有效且简单的示例。
我的设置是:

  • Spring Boot
  • 多个 @KafkaListener一节课有不同的主题
  • 用于测试的嵌入式 Kafka 开始正常
  • 使用发送到主题的 Kafka 模板进行测试,但
    @KafkaListener即使经过很长的 sleep 时间,方法也没有收到任何东西
  • 不显示警告或错误,日志中仅显示来自 Kafka 的垃圾信息

  • 请帮我。大多存在过度配置或过度设计的示例。我相信它可以简单地完成。
    多谢你们!
    @Controller
    public class KafkaController {

    private static final Logger LOG = getLogger(KafkaController.class);

    @KafkaListener(topics = "test.kafka.topic")
    public void receiveDunningHead(final String payload) {
    LOG.debug("Receiving event with payload [{}]", payload);
    //I will do database stuff here which i could check in db for testing
    }
    }

    私有(private)静态字符串 SENDER_TOPIC = "test.kafka.topic";
    @ClassRule
    public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, SENDER_TOPIC);

    @Test
    public void testSend() throws InterruptedException, ExecutionException {

    Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);

    KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
    producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 0, "message00")).get();
    producer.send(new ProducerRecord<>(SENDER_TOPIC, 0, 1, "message01")).get();
    producer.send(new ProducerRecord<>(SENDER_TOPIC, 1, 0, "message10")).get();
    Thread.sleep(10000);
    }

    最佳答案

    嵌入式 Kafka 测试适用于以下配置,

    测试类注释

    @EnableKafka
    @SpringBootTest(classes = {KafkaController.class}) // Specify @KafkaListener class if its not the same class, or not loaded with test config
    @EmbeddedKafka(
    partitions = 1,
    controlledShutdown = false,
    brokerProperties = {
    "listeners=PLAINTEXT://localhost:3333",
    "port=3333"
    })
    public class KafkaConsumerTest {
    @Autowired
    KafkaEmbedded kafkaEmbeded;

    @Autowired
    KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry;

    设置方法的注释之前

    @Before
    public void setUp() throws Exception {
    for (MessageListenerContainer messageListenerContainer : kafkaListenerEndpointRegistry.getListenerContainers()) {
    ContainerTestUtils.waitForAssignment(messageListenerContainer,
    kafkaEmbeded.getPartitionsPerTopic());
    }
    }

    注意:我没有使用 @ClassRule用于创建嵌入式 Kafka 而不是 Autowiring @Autowired embeddedKafka
    @Test
    public void testReceive() throws Exception {
    kafkaTemplate.send(topic, data);
    }

    希望这可以帮助!

    编辑:用 @TestConfiguration 标记的测试配置类

    @TestConfiguration
    public class TestConfig {

    @Bean
    public ProducerFactory<String, String> producerFactory() {
    return new DefaultKafkaProducerFactory<>(KafkaTestUtils.producerProps(kafkaEmbedded));
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
    KafkaTemplate<String, String> kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setDefaultTopic(topic);
    return kafkaTemplate;
    }

    现在 @Test方法将 Autowiring KafkaTemplate 并使用是发送消息

    kafkaTemplate.send(topic, data);

    用上面的行更新了答案代码块

    关于spring-boot - 带有 Spring Boot 的简单嵌入式 Kafka 测试示例,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/48753051/

    26 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com