gpt4 book ai didi

python - Spark Python Avro Kafka 解串器

转载 作者:太空宇宙 更新时间:2023-11-03 13:15:58 24 4
gpt4 key购买 nike

我在 python spark 应用程序中创建了一个 kafka 流,可以解析通过它的任何文本。

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})

我想更改它以便能够解析来自 kafka 主题的 avro 消息。从文件解析 avro 消息时,我会这样做:

            reader = DataFileReader(open("customer.avro", "r"), DatumReader())  

我是 python 和 spark 的新手,如何更改流以便能够解析 avro 消息?另外,如何指定从 Kafka 读取 Avro 消息时要使用的模式???我以前用 Java 做过所有这些,但 Python 让我感到困惑。

编辑:

我尝试更改以包含 avro 解码器

            kafkaStream = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1},valueDecoder=avro.io.DatumReader(schema))

但我得到以下错误

            TypeError: 'DatumReader' object is not callable

最佳答案

我遇到了同样的挑战 - 在 pyspark 中反序列化来自 Kafka 的 avro 消息,并使用 Confluent Schema Registry 模块的 Messageserializer 方法解决了它,因为在我们的例子中,模式存储在 Confluent Schema Registry 中。

您可以在 https://github.com/verisign/python-confluent-schemaregistry 找到该模块

from confluent.schemaregistry.client import CachedSchemaRegistryClient
from confluent.schemaregistry.serializers import MessageSerializer
schema_registry_client = CachedSchemaRegistryClient(url='http://xx.xxx.xxx:8081')
serializer = MessageSerializer(schema_registry_client)


# simple decode to replace Kafka-streaming's built-in decode decoding UTF8 ()
def decoder(s):
decoded_message = serializer.decode_message(s)
return decoded_message

kvs = KafkaUtils.createDirectStream(ssc, ["mytopic"], {"metadata.broker.list": "xxxxx:9092,yyyyy:9092"}, valueDecoder=decoder)

lines = kvs.map(lambda x: x[1])
lines.pprint()

很明显,如您所见,这段代码使用了没有接收器的新直接方法,因此使用了 createdDirectStream(更多信息请参见 https://spark.apache.org/docs/1.5.1/streaming-kafka-integration.html)

关于python - Spark Python Avro Kafka 解串器,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/30339636/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com