gpt4 book ai didi

python - confluence-python 可以生成 avro 中的值和 string 中的键的数据吗?

转载 作者:行者123 更新时间:2023-12-02 08:03:46 36 4
gpt4 key购买 nike

我正在使用 python3 和 confluent-python向 Kafka 发送消息。我需要发送 Avro 格式的值和字符串形式的 key 的数据。但我发现 confluence-python 只能以 Avro 形式发送,或者以字符串形式发送。 confluence-python 源代码如下:

def produce(self, **kwargs):
"""
Asynchronously sends a message to Kafka by encoding with specified or default Avro schema.

:param str topic: topic name
:param object value: An object to serialize
:param str value_schema: Avro schema for value
:param object key: An object to serialize
:param str key_schema: Avro schema for key

Plus any other parameters accepted by confluent_kafka.Producer.produce

:raises SerializerError: On serialization failure
:raises BufferError: If producer queue is full.
:raises KafkaException: For other produce failures.
"""
# get schemas from kwargs if defined
key_schema = kwargs.pop('key_schema', self._key_schema)
value_schema = kwargs.pop('value_schema', self._value_schema)
topic = kwargs.pop('topic', None)
if not topic:
raise ClientError("Topic name not specified.")
value = kwargs.pop('value', None)
key = kwargs.pop('key', None)

if value is not None:
if value_schema:
value = self._serializer.encode_record_with_schema(topic, value_schema, value)
else:
raise ValueSerializerError("Avro schema required for values")

if key is not None:
if key_schema:
key = self._serializer.encode_record_with_schema(topic, key_schema, key, True)
else:
raise KeySerializerError("Avro schema required for key")

super(AvroProducer, self).produce(topic, value, key, **kwargs)

有人知道吗?

最佳答案

定义一个 PrimitiveSchema - 像这样的字符串:

key_schema = avro.loads('{"type": "string"}')

并在构建生产者时像这样使用它:

生产者 = avro.AvroProducer(config=conf,default_key_schema=key_schema,default_value_schema=your_value_schema)

关于python - confluence-python 可以生成 avro 中的值和 string 中的键的数据吗?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54900184/

36 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com