gpt4 book ai didi

mockito - 等到@KafkaListener 使用@EmbeddedKafka 在测试中完成处理消息

转载 作者:行者123 更新时间:2023-12-04 13:39:41 31 4
gpt4 key购买 nike

我有一个 @KafkaListener消费者并希望编写集成测试。
事实是,似乎很难找到方法 Consumer#consume 的确切时刻。在处理消息并且数据库中的某些状态已更改后,完成其执行以执行某些断言。

@Component
public class Consumer {

private final Service service;

@KafkaListener(id = "id", groupId = "group", topics = "topic", containerFactory = "factory")
public void consume(@Payload Message message, Acknowledgment acknowledgment) {
service.process(message);
acknowledgment.acknowledge();
}

}

测试
@SpringBootTest
@EmbeddedKafka
void class Testing {
// some useful beans

@SpyBean
private Consumer consumer;

@Test
void shoudConsume() throws Exception {
Message message = new Message();
String topic = "topic";
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
new KafkaProducer<String, String>(senderProps).send(new ProducerRecord<>(topic, message))
.get(1L, TimeUnit.SECONDS);

Mockito.verify(consumer, Mockito.timeout(1_000L)).consume(any(Message.class), any(Acknowledgment.class));
// perform some asserts
}

事实是,如果我把 Thread.sleep(1000L)消费者处理消息并且一切正常,但使用 Mockito 它不起作用,因为所有断言在消费者完成方法执行之前执行 Consumer#consume .

是否有机会(使用监听器等)捕获 @KafkaListener 的时机-消费者确认/完成消息处理以执行具有适当数据库状态的断言?需要进行集成测试以确保端到端功能正常工作。

我也试着做 #verify检查 @SpyBean private Service service , 方法 Service#process ,但它也不起作用。

最佳答案

如果你想做一些更简单的事情,你可以检查这些选项:

  • 使用Awaitility(当你只需要检查队列处理的效果时);
  •   @Test
    public void testMessageSendReceive_Awaitility() {
    producer.getMysource()
    .output()
    .send(MessageBuilder.withPayload("payload")
    .setHeader("type", "string")
    .build());

    waitAtMost(5, TimeUnit.SECONDS)
    .untilAsserted(() -> {
    then("payload").isEqualTo(
    EmbeddedKafkaAwaitilityTest.this.consumer.getReceivedMessage());
    });
    }
  • 编辑:如果您尝试这种方法,在不同的环境中可能会出现问题。看看这条评论:https://stackoverflow.com/a/65480474/10746857 .可能会有所帮助。
  • 使用 CountDownLach(例如,当您无法访问注入(inject)的监听器时,作为 @SpringBootTest 没有 @Autowired 您的类,这可能是一个坏主意);
  •   @Test
    public void testMessageSendReceive() throws InterruptedException {
    producer.getMysource()
    .output()
    .send(MessageBuilder.withPayload("payload")
    .setHeader("type", "string")
    .build());

    latch.await();
    assertThat(consumer.getReceivedMessage()).isEqualTo("payload");
    }
  • 您也可以创建一个 阻塞队列 (但我认为这不是一个好的选择)。
  • BlockingQueue<ConsumerRecord<String, String>> consumerRecords;
    consumerRecords = new LinkedBlockingQueue<>();
    consumerRecords.poll(10, TimeUnit.SECONDS);
    引用:
  • https://www.geekyhacker.com/2020/10/03/test-spring-kafka-consumer-and-producer-with-embeddedkafka/
  • https://github.com/Kevded/integration-test-spring-kafka-with-embedded-kafka-consumer-and-producer
  • https://www.baeldung.com/spring-boot-kafka-testing
  • https://medium.com/trendyol-tech/how-to-integration-test-on-spring-kafka-producer-cb9d1caf0795

  • 另一种方法(我没有测试):
  • https://www.geekyhacker.com/2020/10/03/test-spring-kafka-consumer-and-producer-with-embeddedkafka/
  • 关于mockito - 等到@KafkaListener 使用@EmbeddedKafka 在测试中完成处理消息,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59307356/

    31 4 0
    Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
    广告合作:1813099741@qq.com 6ren.com