gpt4 book ai didi

java - 序列化 Avro 消息时出错 - Kafka 架构注册表

转载 作者:行者123 更新时间:2023-12-02 08:52:41 25 4
gpt4 key购买 nike

我正在创建一个 avro 类,其中包含一个字符串和一个 map 作为字段。我可以通过maven生成avro类,并且我能够在localhost:8081中创建一个注册表

.avsc 文件:

    {
"type":"record",
"name":"AvroClass",
"namespace":"belliPack.avro",
"fields":[
{
"name":"title",
"type":"string"
},
{
"name":"map",
"type": {"type": "map", "values": "double"}
}
]
}

架构注册表返回以下内容:$curl -X GET http://localhost:8081/subjects/teste1-value/versions/1

{"subject":"teste1-value","version":1,"id":42,"schema":"{"type":"record","name":"AvroClass","namespace":"belliPack.avro","fields":[{"name":"title","type":"string"},{"name":"map","type":{"type":"map","values":"double"}}]}"}

我的 Kafka Producer 类是:

public KafkaProducer<String, AvroClass> createKafkaProducer() {
String bootstrapServer = "127.0.0.1:9092";
String schemaRegistryURL = "127.0.0.1:8081";

//create Producer properties
Properties properties = new Properties();
//kafka documentation>producer configs
properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,schemaRegistryURL);

//create producer
KafkaProducer<String, AvroClass> producer = new KafkaProducer<>(properties);
return producer;
}

但是在运行我的 Kafka Producer 时出现此错误:

    Exception in thread "Thread-1" Exception in thread "Thread-3" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: no protocol: 127.0.0.1:8081/subjects/teste1-value/versions
at java.base/java.net.URL.(URL.java:644)
at java.base/java.net.URL.(URL.java:540)
at java.base/java.net.URL.(URL.java:487)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:175)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
at belliPack.Kafka.Kafka_Producer.sendData(Kafka_Producer.java:32)
at belliPack.OPC.ExtractNodeValues.run(ExtractNodeValues.java:82)
at java.base/java.lang.Thread.run(Thread.java:834)
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: no protocol: 127.0.0.1:8081/subjects/teste1-value/versions
at java.base/java.net.URL.(URL.java:644)
at java.base/java.net.URL.(URL.java:540)
at java.base/java.net.URL.(URL.java:487)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:175)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
at belliPack.Kafka.Kafka_Producer.sendData(Kafka_Producer.java:32)
at belliPack.OPC.ExtractNodeValues.run(ExtractNodeValues.java:82)
at java.base/java.lang.Thread.run(Thread.java:834)```

最佳答案

java.net.MalformedURLException: no protocol

客户端如何知道您需要 http 还是 https?没有默认值,因此您必须在注册表 URL 上提供它

关于java - 序列化 Avro 消息时出错 - Kafka 架构注册表,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60683325/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com