gpt4 book ai didi

java - 如何在 kafka 中初始化 kafka ConsumerRecords 进行测试

转载 作者:搜寻专家 更新时间:2023-10-31 19:44:09 24 4
gpt4 key购买 nike

我正在为 kafka 消费者组件编写测试用例并模拟 kafkaConsumer.poll()返回 ConsumerRecords<String,String> 的实例.我想初始化 ConsumerRecords并在模拟中使用它,但 ConsumerRecords 的构造函数期待我在测试中没有的实际 kafka 主题。我认为的一种方法是保留对象的序列化副本并反序列化以初始化 ConsumerRecords .有没有其他方法可以达到同样的效果。

最佳答案

这是一些示例代码(Kafka 客户端库版本 0.10.1.1):

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.MockConsumer;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;

...
String topic = "MyTopic";
Collection<TopicPartition> partitions = new ArrayList<TopicPartition>();
Collection<String> topicsCollection = new ArrayList<String>();
partitions.add(new TopicPartition(topic, 1));
Map<TopicPartition, Long> partitionsBeginningMap = new HashMap<TopicPartition, Long>();
Map<TopicPartition, Long> partitionsEndMap = new HashMap<TopicPartition, Long>();

long records = 10;
for (TopicPartition partition : partitions) {
partitionsBeginningMap.put(partition, 0l);
partitionsEndMap.put(partition, records);
topicsCollection.add(partition.topic());
}

MockConsumer<String, MyObject> second = new MockConsumer<String, MyObject>(
OffsetResetStrategy.EARLIEST);
second.subscribe(topicsCollection);
second.rebalance(partitions);
second.updateBeginningOffsets(partitionsBeginningMap);
second.updateEndOffsets(partitionsEndMap);
for (long i = 0; i < 10; i++) {
MyObject value = Generator.generate();
ConsumerRecord<String, MyObject> record = new ConsumerRecord<String, MyObject>(
topic, 1, i, null,value);
second.addRecord(record);
}
...

关于java - 如何在 kafka 中初始化 kafka ConsumerRecords<String,String> 进行测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39716746/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com