gpt4 book ai didi

python - 使用 Bottledwater-pg,Python 消费者如何读取数据?

转载 作者:行者123 更新时间:2023-11-30 23:09:40 27 4
gpt4 key购买 nike

我用Python编写了一个消费者,如下所示:

from kafka import KafkaConsumer
import avro.schema
import avro.io
import io

# To consume messages
consumer = KafkaConsumer('test',
group_id='',
bootstrap_servers=['kafka:9092'])


schema = """
{
"namespace":"com.martinkl.bottledwater.dbschema.public",
"type":"record",
"name":"test",
"fields":[
{"name":"id","type":["int", "null"]},
{"name":"value","type":["string", "null"]}
]
}
"""
schema = avro.schema.parse(schema)

for msg in consumer:
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
hello = reader.read(decoder)
print hello

一切看起来都不错,但是当我运行插入数据到 Postgres 时:

postgres=# insert into test (value) values('hello world!');

消费者的输出为空:

$ python consumer_bottledwater-pg.py 
{u'id': 0, u'value': u''}

请帮我解决这个问题。预先感谢您。

最佳答案

Bottled Water 发布到 Kafka 的 Avro 编码消息以 5 字节 header 为前缀。第一个字节始终为零(保留供将来使用),接下来的 4 个字节是大端 32 位数字,指示架构 ID。

在您的示例中,您已在 Python 应用程序中对架构进行了硬编码,但一旦上游数据库架构发生更改,该方法就会崩溃。这就是为什么瓶装水最好与 schema registry 一起使用。 。当您从 Kafka 读取消息时,您首先解码 header 以查找架构 ID,如果您之前没有见过该架构 ID,则 query the registry找到架构。然后您可以使用该模式解码消息的其余部分。架构可以缓存在使用者中,因为注册表保证特定 ID 的架构是不可变的。

您还可以查看KafkaAvroDeserializer的代码随模式注册表一起提供,以查看 Java 中如何完成此解码。您可以在 Python 中执行相同的操作。

关于python - 使用 Bottledwater-pg,Python 消费者如何读取数据?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31047163/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com