gpt4 book ai didi

java - 如何使用 Avro (schemaRegistry) 对 Kafka Streams 进行功能测试?

转载 作者:行者123 更新时间:2023-11-30 06:42:54 26 4
gpt4 key购买 nike

  • 简要说明我想要实现的目标:我想对 avro 记录的 kafka 流拓扑(使用 TopologyTestDriver)进行功能测试。

  • 问题:无法“模拟”schemaRegistry 以自动执行模式发布/读取

到目前为止我尝试的是使用 MockSchemaRegistryClient 来尝试模拟 schemaRegistry,但我不知道如何将它链接到 Avro Serde。

代码

public class SyncronizerIntegrationTest {


private ConsumerRecordFactory<String, Tracking> recordFactory = new ConsumerRecordFactory<>(new StringSerializer(), new SpecificAvroSerializer<>());

MockSchemaRegistryClient mockSchemaRegistryClient = new MockSchemaRegistryClient();


@Test
void integrationTest() throws IOException, RestClientException {


Properties props = new Properties();
props.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "streamsTest");
props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
props.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.setProperty(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, GenericAvroSerde.class.getName());
props.setProperty(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081"); //Dunno if this do anything? :/
StreamsBuilder kStreamBuilder = new StreamsBuilder();
Serde<Tracking> avroSerde = getAvroSerde();
mockSchemaRegistryClient.register(Tracking.getClassSchema().getName(), Tracking.getClassSchema());


KStream<String, Tracking> unmappedOrdersStream = kStreamBuilder.stream(
"topic",
Consumed.with(Serdes.String(), avroSerde));

unmappedOrdersStream
.filter((k, v) -> v != null).to("ouput");

Topology topology = kStreamBuilder.build();
TopologyTestDriver testDriver = new TopologyTestDriver(topology, props);

testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking()));

}
}

AvroSerde 方法

private <T extends SpecificRecord> Serde<T> getAvroSerde() {

// Configure Avro ser/des
final Map<String,String> avroSerdeConfig = new HashMap<>();
avroSerdeConfig.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://mock:8081");

final Serde<T> avroSerde = new SpecificAvroSerde<>();
avroSerde.configure(avroSerdeConfig, false); // `false` for record values
return avroSerde;
}

如果我在没有 testDriver.pipeInput(recordFactory.create("topic", "1", createValidMappedTracking())); 的情况下运行测试,它运行良好(看起来一切都已正确解决)

但是

当我尝试插入数据(pipeInput)时,它抛出以下异常:对象“Tracking”已满。

org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.lang.NullPointerException
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:65)
at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.serialize(SpecificAvroSerializer.java:38)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:184)
at org.apache.kafka.streams.test.ConsumerRecordFactory.create(ConsumerRecordFactory.java:270)

编辑了,我没有删除这个,因为“历史日志”提供了遵循的路径。

最佳答案

Confluent 提供了大量用于测试 Kafka (Streams) 以及 Schema Registry 的示例代码。

https://github.com/confluentinc/kafka-streams-examples/blob/5.0.0-post/src/test/java/io/confluent/examples/streams/SpecificAvroIntegrationTest.java

最重要的是,模拟并不是一个完整的集成测试——启动一个带有内存模式注册表的实际 Kafka 代理才是。

在上面的代码中,参见

@ClassRule
public static final EmbeddedSingleNodeKafkaCluster CLUSTER = new EmbeddedSingleNodeKafkaCluster();

streamsConfiguration.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, CLUSTER.schemaRegistryUrl());

关于java - 如何使用 Avro (schemaRegistry) 对 Kafka Streams 进行功能测试?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/52737242/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com