gpt4 book ai didi

java - 使用Python的Avro库推送数据时Kafka AVRO反序列化错误

转载 作者:行者123 更新时间:2023-12-02 10:27:22 26 4
gpt4 key购买 nike

我已经设置了一个 Kafka 集群,其中一个 Kafka 连接节点具有 Postgres 的接收器配置。

AVRO 架构:

{
"namespace": "example.avro",
"type": "record",
"name": "topicname",
"fields": [
{"name": "deviceid", "type": "string"},
{"name": "longitude", "type": "float"},
{"name": "latitude", "type": "float"}
]
}

我发布数据的Python代码是:

# Path to user.avsc avro schema
SCHEMA_PATH = "user.avsc"
SCHEMA = avro.schema.parse(open(SCHEMA_PATH).read())

writer = DatumWriter(SCHEMA)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write({"deviceid":"9098", "latitude": 90.34 , "longitude": 334.4}, encoder)
raw_bytes = bytes_writer.getvalue()
PRODUCER.send_messages(TOPIC, raw_bytes)

我在 Kafka Connect 日志中收到以下错误:

org.apache.kafka.common.errors.SerializationException: Error
deserializing Avro message for id -1\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n","id":0,"worker_id":"0.0.0.0:8083"}],"type":"sink"}

可能是什么问题?
或者对于上述 json 数据来说,正确的 avro 方案应该是什么?

最佳答案

我没有对各种 python 客户端做太多工作,但是几乎可以肯定这个神奇的字节错误是因为您发送的内容可能是有效的 avro,但是如果您想与架构注册表集成,则有效负载需要位于不同的格式(额外的 header 信息,记录在此处https://docs.confluent.io/current/schema-registry/docs/serializer-formatter.html搜索有线格式或魔术字节)。我个人会尝试使用confluence的python kafka客户端--https://github.com/confluentinc/confluent-kafka-python -- 它有使用 Avro 和架构注册表的示例。

关于java - 使用Python的Avro库推送数据时Kafka AVRO反序列化错误,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53849038/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com