gpt4 book ai didi

apache-spark - 如何在读取来自 Kafka 的消息流时处理 Avro 消息?

转载 作者:行者123 更新时间:2023-12-05 05:18:30 26 4
gpt4 key购买 nike

下面的代码从 Kafka 读取消息并且消息在 Avro 中,那么我如何解析消息并将其放入 Spark 2.2.0 中的数据帧?

Dataset<Row> df = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();

https://github.com/databricks/spark-avro图书馆没有流式案例的例子。

最佳答案

how do I parse the message and put it into a dataframe in Spark 2.2.0?

这是您的家庭练习,需要一些编码。

This https://github.com/databricks/spark-avro library had no example for streaming case.

有人告诉我(并在这里看到了几个问题)spark-avro 支持 Spark Structured Streaming(又名 Spark Streams)。它适用于非流式数据集,但无法处理流式数据集。

这就是为什么我写道这是你必须自己编写代码的原因。

可能如下所示(为简单起见,我使用 Scala):

// Step 1. convert messages to be strings
val avroMessages = df.select($"value" cast "string")

// Step 2. Strip the avro layer off
val from_avro = udf { (s: String) => ...processing here... }
val cleanDataset = avroMessages.withColumn("no_avro_anymore", from_avro($"value"))

这将需要开发一个 from_avro 自定义 UDF 来执行您想要的操作(并且类似于 Spark 使用 from_json 标准函数处理 JSON 格式的方式!)


或者(并以更高级的方式?/复杂的方法)编写您自己的自定义流媒体 Source用于 Kafka 中 Avro 格式的数据集,并改用它。

Dataset<Row> df = sparkSession.readStream()
.format("avro-kafka") // <-- HERE YOUR CUSTOM Source
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "topic1")
.load();

我还没有发现 avro-kafka 格式的可行性。它确实可行,但同时做两件事,即从 Kafka 读取 进行 Avro 转换,我不相信这是在 Spark Structured Streaming 和一般软件工程中做事的方式。我希望有一种方法可以一个接一个地应用一种格式,但这在 Spark 2.2.1 中是不可能的(并且也不计划用于 2.3)。

我认为 UDF 是目前最好的解决方案。


只是一个想法,您也可以编写自定义 Kafka Deserializer这将在 Spark 加载消息时进行反序列化。

关于apache-spark - 如何在读取来自 Kafka 的消息流时处理 Avro 消息?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47842570/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com