gpt4 book ai didi

performance - Spark withColumn 性能

转载 作者:行者123 更新时间:2023-12-05 07:47:39 25 4
gpt4 key购买 nike

我在spark中写了一些代码如下:

val df = sqlContext.read.json("s3n://blah/blah.gz").repartition(200)

val newdf = df.select("KUID", "XFF", "TS","UA").groupBy("KUID", "XFF","UA").agg(max(df("TS")) as "TS" ).filter(!(df("UA")===""))

val dfUdf = udf((z: String) => {
val parser: UserAgentStringParser = UADetectorServiceFactory.getResourceModuleParser();
val readableua = parser.parse(z)
Array(readableua.getName,readableua.getOperatingSystem.getName,readableua.getDeviceCategory.getName)
})

val df1 = newdf.withColumn("useragent", dfUdf(col("UA"))) ---PROBLEM LINE 1

val df2= df1.map {
case org.apache.spark.sql.Row(col1:String,col2:String,col3:String,col4:String, col5: scala.collection.mutable.WrappedArray[String]) => (col1,col2,col3,col4, col5(0), col5(1), col5(2))
}.toDF("KUID", "XFF","UA","TS","browser", "os", "device")

val dataset =df2.dropDuplicates(Seq("KUID")).drop("UA")
val mobile = dataset.filter(dataset("device")=== "Smartphone" || dataset("device") === "Tablet" ).
mobile.write.format("com.databricks.spark.csv").save("s3n://blah/blah.csv")

这是输入数据的示例 {"TS":"1461762084","XFF":"85.255.235.31","IP":"10.75.137.217","KUID":"JilBNVgx","UA":"Flixster/1066 CFNetwork/758.3.15 Darwin /15.4.0"

所以在上面的代码片段中,我正在读取一个 2.4GB 大小的 gz 文件。读取需要 9 分钟。我按 ID 分组并采用最大时间戳。但是(在问题行 1)添加列(带列)的行需要 2 小时。此行采用用户代理并尝试派生操作系统,设备,浏览器信息。这是在这里做事的错误方法吗?

我在具有 r3.4xlarge(8 核和 122Gb 内存)的 4 节点 AWS 集群上运行它,配置如下

--executor-memory 30G --num-executors 9 --executor-cores 5

最佳答案

这里的问题是gzip 不可拆分,不能并行读取。后台发生的事情是单个进程将从存储桶中下载文件,然后重新分区以在整个集群中分发数据。请将输入数据重新编码为 a splittable format .如果输入文件变化不大,您可以考虑 bzip2(因为编码非常昂贵,可能需要一些时间)。

关于performance - Spark withColumn 性能,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39479951/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com