gpt4 book ai didi

apache-spark - Spark RDD.isEmpty 花费很多时间

转载 作者:行者123 更新时间:2023-12-04 05:28:05 25 4
gpt4 key购买 nike

我构建了一个 Spark 集群
worker :2
核心数:12
内存:总计 32.0 GB,已使用 20.0 GB
每个worker有1个cpu,6个核,10.0GB内存

我的程序从 MongoDB 集群 获取数据源。 SparkMongoDB 集群 在同一个 LAN(1000Mbps) 中。MongoDB 文档 格式:{name:string, value:double, time:ISODate}

大约有 1300 万份文件。

我想从包含 60 个文档的特殊时间获取特殊 name 的平均值。这是我的关键功能

 /*
*rdd=sc.newAPIHadoopRDD(configOriginal, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject])
Apache-Spark-1.3.1 scala doc: SparkContext.newAPIHadoopFile[K, V, F <: InputFormat[K, V]](path: String, fClass: Class[F], kClass: Class[K], vClass: Class[V], conf: Configuration = hadoopConfiguration): RDD[(K, V)]
*/
def findValueByNameAndRange(rdd:RDD[(Object,BSONObject)],name:String,time:Date): RDD[BasicBSONObject]={

val nameRdd = rdd.map(arg=>arg._2).filter(_.get("name").equals(name))
val timeRangeRdd1 = nameRdd.map(tuple=>(tuple, tuple.get("time").asInstanceOf[Date]))
val timeRangeRdd2 = timeRangeRdd1.map(tuple=>(tuple._1,duringTime(tuple._2,time,getHourAgo(time,1))))
val timeRangeRdd3 = timeRangeRdd2.filter(_._2).map(_._1)
val timeRangeRdd4 = timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)

if(timeRangeRdd4.isEmpty()){
return basicBSONRDD(name, time)
}
else{
return timeRangeRdd4.map(tuple => {
val bson = new BasicBSONObject()
bson.put("name", tuple._1)
bson.put("value", tuple._2/60)
bson.put("time", time)
bson })
}
}

这里是部分Job信息 enter image description here enter image description here

我的程序运行得太慢了。是因为 isEmptyreduceByKey 吗?如果是,我该如何改进它?如果不是,为什么?
=======更新===

 timeRangeRdd3.map(x => (x.get("name").toString, x.get("value").toString.toDouble)).reduceByKey(_ + _)

在第34行

enter image description here

我知道reduceByKey 是一个全局操作,可能会花费很多时间,但是,它所花费的超出了我的预算。我该如何改进它或者它是Spark的缺陷。同样的计算和硬件,如果我使用java的多线程,只需要几秒钟。

最佳答案

首先,isEmpty 只是 RDD 阶段结束的点。 mapfilter 不需要随机播放,UI 中使用的方法始终是触发阶段更改/随机播放的方法...在这种情况下 isEmpty。从这个角度看,运行缓慢的原因并不那么容易,尤其是在没有看到原始 RDD 的组成的情况下。我可以告诉您,isEmpty 首先检查 partition 大小,然后执行 take(1) 并验证是否返回了数据。因此,很可能是网络中存在瓶颈或沿途有其他障碍。它甚至可能是 GC 问题...单击 isEmpty 并查看您可以从那里看出更多内容。

关于apache-spark - Spark RDD.isEmpty 花费很多时间,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31737514/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com