gpt4 book ai didi

junit - 如何使用嵌入在 spring 云流中的 kafka 创建单元测试

转载 作者:行者123 更新时间:2023-12-04 16:58:12 30 4
gpt4 key购买 nike

抱歉,这个问题太笼统了,但有人有一些关于如何使用 kafka 嵌入式执行生产者和消费者测试的教程或指南。我尝试了几个,但有几个版本的依赖项,但没有一个真正有效=/

我正在使用 spring 云流 kafka。

最佳答案

我们通常建议使用 Test Binder在测试中,但如果您想使用嵌入式 kafka 服务器,则可以完成...

将此添加到您的 POM...

<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>

测试应用...
@SpringBootApplication
@EnableBinding(Processor.class)
public class So43330544Application {

public static void main(String[] args) {
SpringApplication.run(So43330544Application.class, args);
}

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public byte[] handle(byte[] in){
return new String(in).toUpperCase().getBytes();
}

}

应用程序. 属性...
spring.cloud.stream.bindings.output.destination=so0544out
spring.cloud.stream.bindings.input.destination=so0544in
spring.cloud.stream.bindings.output.producer.headerMode=raw
spring.cloud.stream.bindings.input.consumer.headerMode=raw
spring.cloud.stream.bindings.input.group=so0544

测试用例...
@RunWith(SpringRunner.class)
@SpringBootTest
public class So43330544ApplicationTests {

@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1);

@Autowired
private KafkaTemplate<byte[], byte[]> template;

@Autowired
private KafkaProperties properties;

@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
}

@Test
public void testSendReceive() {
template.send("so0544in", "foo".getBytes());
Map<String, Object> configs = properties.buildConsumerProperties();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<byte[], byte[]> cf = new DefaultKafkaConsumerFactory<>(configs);
Consumer<byte[], byte[]> consumer = cf.createConsumer();
consumer.subscribe(Collections.singleton("so0544out"));
ConsumerRecords<byte[], byte[]> records = consumer.poll(10_000);
consumer.commitSync();
assertThat(records.count()).isEqualTo(1);
assertThat(new String(records.iterator().next().value())).isEqualTo("FOO");
}

}

关于junit - 如何使用嵌入在 spring 云流中的 kafka 创建单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/43330544/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com