gpt4 book ai didi

java - Apache kafka 嵌入式 kafka junit 测试 - 当我运行单元测试时启动应用程序

转载 作者:行者123 更新时间:2023-11-30 05:47:51 25 4
gpt4 key购买 nike

我正在使用kafka在Spring Boot中开发异步邮件服务器。

我已经使用嵌入式 kafka 编写了测试,它在随机端口中启动自己的 kafka 主题并使用它进行测试。

当我开始时,这个应用程序上下文正在加载,并且它期望在我的本地中使用 kafka 集群。我需要停止加载应用程序上下文。我复制了 https://github.com/code-not-found/spring-kafka/blob/master/spring-kafka-unit-test-classrule/src/test/java/com/codenotfound/kafka/producer/SpringKafkaSenderTest.java 中的代码这绝对没问题。当我在项目中遵循相同的风格时,我可以看到实际的应用程序开始。

SpringKafkaSenderTest .java

package com.mailer.embeddedkafkatests;
import static org.junit.Assert.assertTrue;

import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;
import org.springframework.kafka.test.rule.EmbeddedKafkaRule;
import org.springframework.kafka.test.utils.ContainerTestUtils;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import org.springframework.test.annotation.DirtiesContext;
import org.springframework.test.context.junit4.SpringRunner;

import com.mailer.model.Mail;
import com.mailer.producer.KafkaMessageProducer;
import com.mailer.serializer.MailSerializer;

@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
public class SpringKafkaSenderTest {

private static final Logger LOGGER =
LoggerFactory.getLogger(SpringKafkaSenderTest.class);

private static String SENDER_TOPIC = "sender.t";

@Autowired
private KafkaMessageProducer sender;

private KafkaMessageListenerContainer<String, Mail> container;

private BlockingQueue<ConsumerRecord<String, Mail>> records;

@ClassRule
public static EmbeddedKafkaRule embeddedKafka =
new EmbeddedKafkaRule(1, true, SENDER_TOPIC);

@Before
public void setUp() throws Exception {
// set up the Kafka consumer properties
Map<String, Object> consumerProperties =
KafkaTestUtils.consumerProps("sender", "false",
embeddedKafka.getEmbeddedKafka());
consumerProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MailSerializer.class);

// create a Kafka consumer factory
DefaultKafkaConsumerFactory<String, Mail> consumerFactory =
new DefaultKafkaConsumerFactory<String, Mail>(
consumerProperties);//, new StringDeserializer(), new JsonDeserializer<>(Mail.class));

// set the topic that needs to be consumed
ContainerProperties containerProperties =
new ContainerProperties(SENDER_TOPIC);

// create a Kafka MessageListenerContainer
container = new KafkaMessageListenerContainer<>(consumerFactory,
containerProperties);

// create a thread safe queue to store the received message
records = new LinkedBlockingQueue<>();

// setup a Kafka message listener
container
.setupMessageListener(new MessageListener<String, Mail>() {
@Override
public void onMessage(
ConsumerRecord<String, Mail> record) {
LOGGER.debug("test-listener received message='{}'",
record.toString());
records.add(record);
}
});

// start the container and underlying message listener
container.start();

// wait until the container has the required number of assigned partitions
ContainerTestUtils.waitForAssignment(container,
embeddedKafka.getEmbeddedKafka().getPartitionsPerTopic());
}

@After
public void tearDown() {
// stop the container
container.stop();
}

@Test
public void testSend() throws InterruptedException {
// send the message
Mail mail = new Mail();
mail.setFrom("vinoth@local.com");
sender.sendMessage(mail);
Thread.sleep(4000);
// check that the message was received
ConsumerRecord<String, Mail> received =
records.poll(10, TimeUnit.SECONDS);
// Hamcrest Matchers to check the value
assertTrue(received.value().getFrom().equals(mail.getFrom()));
System.out.println(received.value().getFrom());
// assertThat(received, hasValue(mail));
// AssertJ Condition to check the key
// assertThat(received).has(key(null));
}
}

最佳答案

为什么要停止加载 spring 上下文?这个junit的目的不是测试你的spring应用程序吗?

无论如何,只要删除@SpringBootTest注释,Spring上下文就不会加载。

关于java - Apache kafka 嵌入式 kafka junit 测试 - 当我运行单元测试时启动应用程序,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54537826/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com