gpt4 book ai didi

junit - 如何实例化用于 junit 测试的模拟 Kafka 主题?

转载 作者:行者123 更新时间:2023-12-02 10:41:41 28 4
gpt4 key购买 nike

我对使用 kafka 主题的代码进行了一些 JUnit 测试。我尝试过的模拟 kafka 主题不起作用,并且在线找到的示例非常旧,因此它们也不适用于 0.8.2.1。如何使用 0.8.2.1 创建模拟 kafka 主题?

澄清一下:我选择使用该主题的实际嵌入式实例,以便使用真实实例进行测试,而不是在mockito中模拟交接。这样我就可以测试我的自定义编码器和解码器是否实际工作,并且当我使用真实的 kafka 实例时它不会失败。

最佳答案

https://gist.github.com/asmaier/6465468#file-kafkaproducertest-java

此示例已更新为可在新的 0.8.2.2 版本中运行。以下是带有 Maven 依赖项的代码片段:

pom.xml:

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
<classifier>test</classifier>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
</dependency>
</dependencies>

KafkaProducerTest.java:

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.I0Itec.zkclient.ZkClient;
import org.junit.Test;
import kafka.admin.TopicCommand;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.producer.KeyedMessage;
import kafka.producer.Producer;
import kafka.producer.ProducerConfig;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.MockTime;
import kafka.utils.TestUtils;
import kafka.utils.TestZKUtils;
import kafka.utils.Time;
import kafka.utils.ZKStringSerializer$;
import kafka.zk.EmbeddedZookeeper;
import static org.junit.Assert.*;

/**
* For online documentation
* see
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/utils/TestUtils.scala
* https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/admin/TopicCommand.scala
* https://github.com/apache/kafka/blob/0.8.2/core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala
*/
public class KafkaProducerTest {

private int brokerId = 0;
private String topic = "test";

@Test
public void producerTest() throws InterruptedException {

// setup Zookeeper
String zkConnect = TestZKUtils.zookeeperConnect();
EmbeddedZookeeper zkServer = new EmbeddedZookeeper(zkConnect);
ZkClient zkClient = new ZkClient(zkServer.connectString(), 30000, 30000, ZKStringSerializer$.MODULE$);

// setup Broker
int port = TestUtils.choosePort();
Properties props = TestUtils.createBrokerConfig(brokerId, port, true);

KafkaConfig config = new KafkaConfig(props);
Time mock = new MockTime();
KafkaServer kafkaServer = TestUtils.createServer(config, mock);

String [] arguments = new String[]{"--topic", topic, "--partitions", "1","--replication-factor", "1"};
// create topic
TopicCommand.createTopic(zkClient, new TopicCommand.TopicCommandOptions(arguments));

List<KafkaServer> servers = new ArrayList<KafkaServer>();
servers.add(kafkaServer);
TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 5000);

// setup producer
Properties properties = TestUtils.getProducerConfig("localhost:" + port);
ProducerConfig producerConfig = new ProducerConfig(properties);
Producer producer = new Producer(producerConfig);

// setup simple consumer
Properties consumerProperties = TestUtils.createConsumerProperties(zkServer.connectString(), "group0", "consumer0", -1);
ConsumerConnector consumer = kafka.consumer.Consumer.createJavaConsumerConnector(new ConsumerConfig(consumerProperties));

// send message
KeyedMessage<Integer, byte[]> data = new KeyedMessage(topic, "test-message".getBytes(StandardCharsets.UTF_8));

List<KeyedMessage> messages = new ArrayList<KeyedMessage>();
messages.add(data);

producer.send(scala.collection.JavaConversions.asScalaBuffer(messages));
producer.close();

// deleting zookeeper information to make sure the consumer starts from the beginning
// see https://stackoverflow.com/questions/14935755/how-to-get-data-from-old-offset-point-in-kafka
zkClient.delete("/consumers/group0");

// starting consumer
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0);
ConsumerIterator<byte[], byte[]> iterator = stream.iterator();

if(iterator.hasNext()) {
String msg = new String(iterator.next().message(), StandardCharsets.UTF_8);
System.out.println(msg);
assertEquals("test-message", msg);
} else {
fail();
}

// cleanup
consumer.shutdown();
kafkaServer.shutdown();
zkClient.close();
zkServer.shutdown();
}
}

请务必检查您的 mvn dependency:tree 是否有任何冲突的库。我必须添加 slf 和 log4j 的排除项:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.11</artifactId>
<version>0.8.2.2</version>
<classifier>test</classifier>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.8.2.2</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
<exclusion>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</exclusion>
</exclusions>
</dependency>

我正在研究的另一个选择是使用 apache curator: Is it possible to start a zookeeper server instance in process, say for unit tests?

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.2.0-incubating</version>
<scope>test</scope>
</dependency>

测试服务器zkTestServer;

@Before
public void startZookeeper() throws Exception {
zkTestServer = new TestingServer(2181);
cli = CuratorFrameworkFactory.newClient(zkTestServer.getConnectString(), new RetryOneTime(2000));
}

@After
public void stopZookeeper() throws IOException {
cli.close();
zkTestServer.stop();
}

关于junit - 如何实例化用于 junit 测试的模拟 Kafka 主题?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/33748328/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com