gpt4 book ai didi

scala - 如何使用 Spark 比较两个表的列?

转载 作者:行者123 更新时间:2023-12-02 20:23:50 26 4
gpt4 key购买 nike

我试图通过读取为 DataFrames 来比较两个表()。对于这些表中的每个公共(public)列,使用主键连接说 order_id 与 order_date、order_name、order_event 等其他列。

我正在使用的 Scala 代码

val primary_key=order_id
for (i <- commonColumnsList){
val column_name = i
val tempDataFrameForNew = newDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")
val tempDataFrameOld = oldDataFrame.selectExpr(s"concat($primaryKey,$i) as concatenated")

//Get those records which aren common in both old/new tables
matchCountCalculated = tempDataFrameForNew.intersect(tempDataFrameOld)
//Get those records which aren't common in both old/new tables
nonMatchCountCalculated = tempDataFrameOld.unionAll(tempDataFrameForNew).except(matchCountCalculated)

//Total Null/Non-Null Counts in both old and new tables.
nullsCountInNewDataFrame = newDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
nullsCountInOldDataFrame = oldDataFrame.select(s"$i").filter(x => x.isNullAt(0)).count().toInt
nonNullsCountInNewDataFrame = newDFCount - nullsCountInNewDataFrame
nonNullsCountInOldDataFrame = oldDFCount - nullsCountInOldDataFrame

//Put the result for a given column in a Seq variable, later convert it to Dataframe.
tempSeq = tempSeq :+ Row(column_name, matchCountCalculated.toString, nonMatchCountCalculated.toString, (nullsCountInNewDataFrame - nullsCountInOldDataFrame).toString,
(nonNullsCountInNewDataFrame - nonNullsCountInOldDataFrame).toString)
}
// Final Step: Create DataFrame using Seq and some Schema.
spark.createDataFrame(spark.sparkContext.parallelize(tempSeq), schema)


上面的代码适用于中等数据集,但随着我的新旧表中列和记录数量的增加,执行时间也在增加。任何形式的建议表示赞赏。
先感谢您。

最佳答案

您可以执行以下操作:
1. 在主键上外连接新旧数据框joined_df = df_old.join(df_new, primary_key, "outer")2. 尽可能缓存它。这将为您节省大量时间
3. 现在您可以使用 spark 函数迭代列并比较列(.isNull 表示不匹配,== 表示匹配等)

for (col <- df_new.columns){
val matchCount = df_joined.filter(df_new[col].isNotNull && df_old[col].isNotNull).count()
val nonMatchCount = ...
}

这应该会快很多,尤其是当您可以缓存数据帧时。如果你不能,这可能是一个好主意,所以将加入的 df 保存到磁盘,以避免每次洗牌

关于scala - 如何使用 Spark 比较两个表的列?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58629919/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com