gpt4 book ai didi

apache-spark - 在 Spark Structured Streaming 中反序列化自引用 protobuf

转载 作者:行者123 更新时间:2023-12-04 04:55:21 25 4
gpt4 key购买 nike

我有一个自引用的 protobuf 架构:

message A { 
uint64 timestamp = 1;
repeated A fields = 2;
}

我正在使用 scalaPB 生成相应的 Scala 类,然后按照以下步骤尝试解码从 Kafka 流中使用的消息:

def main(args : Array[String]) {

val spark = SparkSession.builder.
master("local")
.appName("spark session example")
.getOrCreate()

import spark.implicits._

val ds1 = spark.readStream.format("kafka").
option("kafka.bootstrap.servers","localhost:9092").
option("subscribe","student").load()

val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

val query = ds2.writeStream
.outputMode("append")
.format("console")
.start()

query.awaitTermination()

}

This is a related question here on StackOverflow .

但是,Spark Structured Streaming 会在这一行抛出循环引用错误。

val ds2 = ds1.map(row=> row.getAs[Array[Byte]]("value")).map(Student.parseFrom(_))

我理解这是因为递归引用只能在驱动程序中在 Spark 中处理(基本上是 RDDDataset 级别)。有没有人想出一个解决方法,例如通过 UDF 启用递归调用?

最佳答案

事实证明,这是由于 spark 架构的制作方式存在局限性。为了处理大量数据,代码与一部分数据一起分布在所有从节点上,结果通过主节点进行协调。现在,由于工作节点上没有任何东西可以跟踪堆栈,因此工作人员不允许递归,但只能在驱动程序级别进行递归。

简而言之,当前构建的 spark 不可能进行这种递归解析。最好的选择是转移到具有类似库并且可以轻松解析递归 protobuf 文件的 java。

关于apache-spark - 在 Spark Structured Streaming 中反序列化自引用 protobuf,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/51405706/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com