gpt4 book ai didi

apache-spark - Spark RDD union 非常慢

转载 作者:行者123 更新时间:2023-12-03 07:04:51 26 4
gpt4 key购买 nike

我有 2 个 Spark RDD,dataRDD 和 newPairDataRDD,用于 Spark SQL 查询。当我的应用程序初始化时,dataRDD 将被初始化。指定hbase实体中的所有数据都会存储到dataRDD中。

当客户端的sql查询到来时,我的APP将获取所有新的更新并插入到newPairDataRDD中。dataRDD 联合 newPairDataRDD 并在 Spark SQL 上下文中注册为表。

我在 dataRDD 中发现了 0 条记录,在 newPairDataRDD 中发现了 1 条新插入的记录。合并需要 4 秒。太慢了

我觉得不太合理。有人知道如何让它更快吗?谢谢简单代码如下

    // Step1: load all data from hbase to dataRDD when initial, this only run once. 
JavaPairRDD<String, Row> dataRDD= getAllBaseDataToJavaRDD();
dataRDD.cache();
dataRDD.persist(StorageLevel.MEMORY_ONLY());
logger.info(dataRDD.count());

// Step2: when spark sql query coming, load latest updated and inserted data from db to newPairDataRDD

JavaPairRDD<String, Row> newPairDataRDD = getUpdateOrInstertBaseDataToJavaRDD();
// Step3: if count>0 do union and reduce

if(newPairDataRDD.count() > 0) {

JavaPairRDD<String, Row> unionedRDD =dataRDD.union(newPairDataRDD);

// if data was updated in DB, need to delete the old version from the dataRDD.

dataRDD = unionedRDD.reduceByKey(
new Function2<Row, Row, Row>() {
// @Override
public Row call(Row r1, Row r2) {
return r2;
}
});
}
//step4: register the dataRDD
JavaSchemaRDD schemaRDD = sqlContext.applySchema(dataRDD..values(), schema);

//step5: execute sql query
retRDD = sqlContext.sql(sql);
List<org.apache.spark.sql.api.java.Row> rows = retRDD.collect();

从 Spark Web ui 中,我可以看到如下。显然它需要 4s 来联合

已完成的阶段(8)

StageId 描述提交的持续时间任务:成功/总输入随机读取随机写入

6 在 SparkPlan.scala 收集:85+详细信息 1/4/2015 8:17 2 s 8-Aug 156.0 B

7 联合位于 SparkSqlQueryForMarsNew.java:389+details 1/4/2015 8:17 4 s 8-8-8 64.0 B 156.0 B

最佳答案

实现您想要的效果的更有效方法是使用 cogroup()flatMapValues(),使用 union 除了添加新分区之外几乎没有什么作用dataRDD,这意味着所有数据必须在 reduceByKey() 之前进行混洗。 cogroup()flatMapValues() 将导致仅对 newPairDataRDD 进行重新分区。

JavaPairRDD<String, Tuple2<List<Row>, List<Row>>> unionedRDD = dataRDD.cogroup(newPairDataRDD);
JavaPairRDD<String, Row> updated = unionedRDD.flatMapValues(
new Function<Tuple2<List<Row>, List<Row>>, Iterable<Row>>() {
public Iterable<Row> call(Tuple2<List<Row>, List<Row>> grouped) {
if (grouped._2.nonEmpty()) {
return grouped._2;
} else {
return grouped._1;
}
}
});

或者在 Scala 中

val unioned = dataRDD.cogroup(newPairDataRDD)
val updated = unioned.flatMapValues { case (oldVals, newVals) =>
if (newVals.nonEmpty) newVals else oldVals
}

免责声明,我不习惯用Java编写spark!以上如有错误还请大家指正!

关于apache-spark - Spark RDD union 非常慢,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27772298/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com