gpt4 book ai didi

java - 为 Kafka Consumer 编写 JUnit 测试

转载 作者:行者123 更新时间:2023-12-01 12:11:50 26 4
gpt4 key购买 nike

我有一个订阅主题的 kafka 消费者。实现工作正常。但是当试图为此实现单元测试时,会出现问题,因为它是由 Runnable 实现的。界面。

实现

@Override
public void run() {
kafkaConsumer.subscribe(kafkaTopics);

while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(1000);
Map<String, InventoryStock> skuMap = new LinkedHashMap<>();

try {
// populating sku map with consumer record
for (ConsumerRecord<String, String> record : records) {
populateMap(skuMap, record.value());
}

if (MapUtils.isNotEmpty(skuMap)) {
// writing sku inventory with populated sku map
inventoryDao.updateInventoryTable(INVENTORY_JOB_ID, skuMap);
}
} catch (Exception e) {

}
kafkaConsumer.commitAsync();
}
}

我尝试使用 MockConsumer 实现测试.但是需要在实现中分配给消费者。但是消费者在实现中并没有暴露在外。这是我尝试过的。
@Before
public void onBefore() {
MockitoAnnotations.initMocks(this);

Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST);
skuInventoryConsumer = new SkuInventoryConsumer(consumerProps);

KafkaConsumer kafkaConsumerMock = mock(KafkaConsumer.class);

Whitebox.setInternalState(skuInventoryConsumer, "LOGGER", LOGGER);
Whitebox.setInternalState(skuInventoryConsumer, "kafkaConsumer", kafkaConsumerMock);

}

@Test
public void should_subscribe_on_topic() {
consumer.assign(Arrays.asList(new TopicPartition("my_topic", 0)));

HashMap<TopicPartition, Long> beginningOffsets = new HashMap<>();
beginningOffsets.put(new TopicPartition("my_topic", 0), 0L);
consumer.updateBeginningOffsets(beginningOffsets);

consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 0L, "mykey", "myvalue0"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 1L, "mykey", "myvalue1"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 2L, "mykey", "myvalue2"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 3L, "mykey", "myvalue3"));
consumer.addRecord(new ConsumerRecord<>("my_topic", 0, 4L, "mykey", "myvalue4"));
}

既然是 runnable并且消费者没有暴露这个测试没有按预期工作。我该如何解决这个问题?

最佳答案

我建议使用 Mockito,如下例所示

    Consumer<String, String> kafkaConsumerLocal = mock(Consumer.class);
KafkaConsumer kafkaConsumer = spy(new KafkaConsumer("topic-name));

ReflectionTestUtils.setField(kafkaConsumer, "threadPoolCount", 1);
ReflectionTestUtils.setField(kafkaConsumer, "consumer", kafkaConsumerLocal);

doNothing().when(kafkaConsumer).runConsumer();
doNothing().when(kafkaConsumer).addShutDownHook();
doReturn(kafkaConsumerLocal).when(consumerInit).getKafkaConsumer();

关于java - 为 Kafka Consumer 编写 JUnit 测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50921061/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com