gpt4 book ai didi

python - 异常 : Pulsar error: IncompatibleSchema

转载 作者:行者123 更新时间:2023-12-05 05:47:05 30 4
gpt4 key购买 nike

我是 Pulsar 的新手,我只是在探索新项目的功能。我正在尝试一个非常基本的示例来基于模式从生产者发送数据。为了提供一些背景知识,我的想法是将数据从 apache-pulsar 发送到 Clickhouse 数据库。我已经完成了接收器连接器的设置,并使用下面的命令对其进行了验证

bin/pulsar-admin sinks status --tenant public --namespace default --name jdbc-clickhouse-sink

bin/pulsar-admin sinks list --tenant public --namespace default输出:[“jdbc-clickhouse-sink”]

所以我在 Clickhouse DB 中创建了一个表。我希望将数据发送到应保存在数据库中的主题。这样做时,我想保持模式一致,因此我想设置一个模式。下面的示例代码

import pulsar
from pulsar.schema import *

class Example(Record):
a = Integer()
b = Integer()
c = Integer()


client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer(
topic='my-topic',
schema=AvroSchema(Example) )

producer.send(Example( a=444 , b=62, c=999 ))

当我运行上面的代码时,出现以下错误

---------------------------------------------------------------------------
Exception Traceback (most recent call last)
<ipython-input-114-3b0aa7d0415f> in <module>
9
10 client = pulsar.Client('pulsar://localhost:6650' class="ansi-blue-fg">)
---> 11 producer = client.create_producer(
12 topic='my-topic',
13 schema=AvroSchema(Example) )

~/opt/anaconda3/lib/python3.8/site-packages/pulsar/__init__.py in
create_producer(self, topic, producer_name, schema, initial_sequence_id,
send_timeout_millis, compression_type, max_pending_messages,
max_pending_messages_across_partitions, block_if_queue_full, batching_enabled,
batching_max_messages, batching_max_allowed_size_in_bytes,
batching_max_publish_delay_ms, message_routing_mode, properties, batching_type)
560
561 p = Producer()
--> 562 p._producer = self._client.create_producer(topic, conf)
563 p._schema = schema
564 return p

Exception: Pulsar error: IncompatibleSchema

有人可以帮助我在这里缺少什么吗

最佳答案

确保你已经安装了带有 avro 的 Pulsar Python 客户端

pip3 安装 fastavropip3 安装 pytzpip3 安装 pulsar-client[avro]

在这里查看我的 python 示例和模式 https://github.com/tspannhw/FLiP-Pi-Weather/blob/main/weather.py

看看我的例子 https://github.com/tspannhw/FLiP-Stream2Clickhouse

检查你的架构bin/pulsar-admin 模式获得持久性://public/default/my-topic

Python 文档 https://pulsar.apache.org/api/python/ https://pulsar.apache.org/api/python/schema/schema.m.html#pulsar.schema.schema.AvroSchema

Pulsar 每个客户端可用的功能 https://docs.google.com/spreadsheets/d/1YHYTkIXR8-Ql103u-IMI18TXLlGStK8uJjDsOOA0T20/edit

您可能需要从实际的 Avro Schema 文件生成类,这通常是在 Java 中完成的。

看这个例子:

https://github.com/ta1meng/pulsar-python-avro-schema-examples

如果不需要 Avro,JsonSchema 不需要这个额外的步骤

关于python - 异常 : Pulsar error: IncompatibleSchema,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/71085489/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com