gpt4 book ai didi

python-3.x - 使用python编译的protobuf pb2作为键值序列化器

转载 作者:行者123 更新时间:2023-12-04 08:38:17 24 4
gpt4 key购买 nike

我正在尝试从使用谷歌的 protobuf 序列化的 kafka topiv 读取数据。
我使用 protoc 编译了 proto 文件生成了 pb2文件。
现在我正在尝试使用 faust 并创建一个流处理器,但我找不到将 pb2 文件用作 key_serializer 的正确方法。和 value_serializer .
这是我尝试过的:

import faust
from proto.topic_pb2 import topic


app = faust.App(
'faust-consumer',
broker='kafka://',
store="memory://",
cache="memory://",
)

schema = faust.Schema(
## key_type=topic.PK,
## value_type=topic,
key_serializer=topic.PK,
value_serializer=topic,
)

topic = app.topic(
'topic',
schema=schema
)


@app.agent(topic)
async def consume(topic):
async for event in topic:
print(event)


if __name__ == "__main__":
app.main()
有人知道如何在序列化程序中使用 pb2 吗?

最佳答案

伙计,过去一周我也在尝试做同样的事情。经过努力,我终于得到了一些工作 - 不是最好的方式 - 但它运作良好。
所以最初我使用了这个 python 编译器:https://github.com/danielgtaylor/python-betterproto生成 *.py带有数据类/类型提示的文件。
然后,我能够创建 Faust.Record通过使用帮助程序动态类:

import abc
import inspect
from typing import Type

import betterproto
import faust

GENERATED_SUFFIX = "__FaustRecord_Auto"


def _import_relative_class(module: str, klass_name: str):
resolved_import = __import__(module, fromlist=[klass_name])
klass = getattr(resolved_import, klass_name)
return klass


def _is_record(attype: Type):
return (
inspect.isclass(attype)
and isinstance(attype, betterproto.Message)
or isinstance(attype, abc.ABCMeta)
)


def _build_record_annotations(klass: Type):
annotations = {}
for atname, attype in klass.__annotations__.items():
if _is_record(attype):
annotations[atname] = make_faust_record(attype)
elif isinstance(attype, str):
subklass = _import_relative_class(klass.__module__, attype)
annotations[atname] = make_faust_record(subklass)
else:
annotations[atname] = attype

return annotations


def make_faust_record(klass: Type):
type_name = f"{klass.__name__}{GENERATED_SUFFIX}"
record_type = type(type_name, (faust.Record, klass), {})
record_type.__annotations__ = _build_record_annotations(klass)
record_type._init_subclass()

return record_type
现在你可以像这样使用它:
import faust
from proto.your_models import YourModel # Import your generated proto here
from faust_converter import make_faust_record


app = faust.App(
'faust-consumer',
broker='kafka://',
store="memory://",
cache="memory://",
)

model_record = make_faust_record(YourModel)

topic = app.topic(
'topic',
value_type=model_record
)


@app.agent(topic)
async def consume(topic):
async for event in topic:
print(event)


if __name__ == "__main__":
app.main()

关于python-3.x - 使用python编译的protobuf pb2作为键值序列化器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/64686686/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com