gpt4 book ai didi

spring-cloud-stream - 如何使用 Kafka DSL 对 Spring Cloud Stream 进行单元测试

转载 作者:行者123 更新时间:2023-12-04 08:10:19 24 4
gpt4 key购买 nike

我正在尝试(单元)测试使用 Kafka DSL 的 Spring Cloud Stream Kafka 处理器,但收到以下错误“Connection to node -1 could not be established. Broker may not be available.”。此外,测试不会关闭。
我尝试了 EmbeddedKafka 和 TestBinder,但我有相同的行为。
我试图从reponse given by Spring Cloud Team开始(有效)并且我调整了该应用程序以使用 Kafka DSL,并且几乎按原样保留了测试类。 EmbeddedKafka 真的支持 Kafka DSL 吗?

我正在使用 Elmhurst.RELEASE

@SpringBootApplication
@EnableBinding(MyBinding.class)
public class So43330544Application {

public static void main(String[] args) {
SpringApplication.run(So43330544Application.class, args);
}

@StreamListener
@SendTo(MyBinding.OUTPUT)
public KStream<String,String> process(@Input(MyBinding.INPUT) KStream<String, String> in) {

return in.peek((k,v) -> System.out.println("Received value " +v ))
.mapValues(v -> v.toUpperCase());
}
}

interface MyBinding {

String INPUT = "input";
String OUTPUT = "output";

@Input(INPUT)
KStream<String, String> messagesIn();

@Output(OUTPUT)
KStream<String, String> messagesOut();
}

更新

如以下示例所示, this answer 中提出的方法当我使用 Spring Cloud Stream 通用语法编写事件处理器时对我有用,但在我使用 Kafka DSL (KStreams) 时不起作用。要查看行为的差异,只需切换到 ExampleAppWorkingExampleAppNotWorking@SpringBootTest注释:
@RunWith(SpringRunner.class)
@SpringBootTest(classes=ExampleKafkaEmbeddedTest.ExampleAppNotWorking.class)
@DirtiesContext(classMode=ClassMode.AFTER_EACH_TEST_METHOD)
public class ExampleKafkaEmbeddedTest {
@ClassRule
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, false, "so0544in","so0544out");

@Autowired
private KafkaTemplate<Integer, byte[]> template;

@Autowired
private KafkaProperties properties;

private static Consumer consumer;

@BeforeClass
public static void setup() throws Exception{
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafka.getBrokersAsString());
System.setProperty("spring.cloud.stream.kafka.binder.zkNodes", embeddedKafka.getZookeeperConnectionString());
System.setProperty("server.port","0");
System.setProperty("spring.jmx.enabled" , "false");

Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("group-id", "false", embeddedKafka);

consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
DefaultKafkaConsumerFactory<String, String> cf = new DefaultKafkaConsumerFactory<>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAnEmbeddedTopic(consumer, "so0544out");

}

@After
public void tearDown() {
if (consumer != null){
consumer.close();
}
}

@Test
public void testSendReceive() {
template.send("so0544in", "foo".getBytes());

Map<String, Object> configs = properties.buildConsumerProperties();
configs.put(ConsumerConfig.GROUP_ID_CONFIG, "test0544");
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

ConsumerRecord<String, String> cr = KafkaTestUtils.getSingleRecord(consumer, "so0544out");

System.out.println("Contenu chaine resultat : " + cr.value());

assertEquals(cr.value(), "FOO");
}

@SpringBootApplication
@EnableBinding(Processor.class)
public static class ExampleAppWorking {

public static void main(String[] args) {
SpringApplication.run(ExampleAppWorking.class, args);
}

@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public String receive(String in) {
return in.toUpperCase();
}
}

@SpringBootApplication
@EnableBinding(MyBinding.class)
public static class ExampleAppNotWorking {

public static void main(String[] args) {
SpringApplication.run(ExampleAppNotWorking.class, args);
}

@StreamListener
@SendTo(MyBinding.OUTPUT)
public KStream<Integer,byte[]> toUpperCase (@Input(MyBinding.INPUT) KStream<Integer,byte[]> in){
return in.map((key, val) -> KeyValue.pair(key, new String(val).toUpperCase().getBytes()));
}
}

public interface MyBinding {
String INPUT = "input";
String OUTPUT = "output";

@Input(INPUT)
KStream<Integer, String> messagesIn();

@Input(OUTPUT)
KStream<Integer, String> messagesOut();
}

}

最佳答案

EmbeddedKafa应该与 Kafka Streams 一起使用。所有这些 tests使用 EmbeddedKafa 进行测试。您可以将这些测试中使用的模式作为您自己测试的模板。

查看您在下面的评论中提供的代码。您需要在您的 setup 中添加此属性方法。
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafka.getBrokersAsString());
主要的 Spring Boot 应用程序期望 Kafka 代理在 localhost 上可用,并且它不知道测试正在运行嵌入式代理。我们需要通过从测试中设置该属性来明确这一事实,以便主引导应用程序正确检测嵌入式 kafka 代理。

关于spring-cloud-stream - 如何使用 Kafka DSL 对 Spring Cloud Stream 进行单元测试,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50628979/

24 4 0