gpt4 book ai didi

scala - Apache Spark 中的数据帧相等

转载 作者:行者123 更新时间:2023-12-02 07:03:47 25 4
gpt4 key购买 nike

假设 df1 和 df2 是 Apache Spark 中的两个 DataFrame,使用两种不同的机制计算,例如 Spark SQL 与 Scala/Java/Python API。

是否有一种惯用的方法来确定两个数据框是否等效(相等,同构),其中等效性是由数据(每行的列名称和列值)相同来确定的,除了行和列的顺序之外?

这个问题的动机是,计算某些大数据结果的方法通常有很多种,每种方法都有自己的权衡。当人们探索这些权衡时,保持正确性非常重要,因此需要在有意义的测试数据集上检查等效性/相等性。

最佳答案

Scala(PySpark 见下文)

spark-fast-tests库有两种进行 DataFrame 比较的方法(我是库的创建者):

assertSmallDataFrameEquality方法收集驱动节点上的DataFrame并进行比较

def assertSmallDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
if (!actualDF.schema.equals(expectedDF.schema)) {
throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
}
if (!actualDF.collect().sameElements(expectedDF.collect())) {
throw new DataFrameContentMismatch(contentMismatchMessage(actualDF, expectedDF))
}
}

assertLargeDataFrameEquality方法比较分布在多台机器上的DataFrame(代码基本复制自spark-testing-base)

def assertLargeDataFrameEquality(actualDF: DataFrame, expectedDF: DataFrame): Unit = {
if (!actualDF.schema.equals(expectedDF.schema)) {
throw new DataFrameSchemaMismatch(schemaMismatchMessage(actualDF, expectedDF))
}
try {
actualDF.rdd.cache
expectedDF.rdd.cache

val actualCount = actualDF.rdd.count
val expectedCount = expectedDF.rdd.count
if (actualCount != expectedCount) {
throw new DataFrameContentMismatch(countMismatchMessage(actualCount, expectedCount))
}

val expectedIndexValue = zipWithIndex(actualDF.rdd)
val resultIndexValue = zipWithIndex(expectedDF.rdd)

val unequalRDD = expectedIndexValue
.join(resultIndexValue)
.filter {
case (idx, (r1, r2)) =>
!(r1.equals(r2) || RowComparer.areRowsEqual(r1, r2, 0.0))
}

val maxUnequalRowsToShow = 10
assertEmpty(unequalRDD.take(maxUnequalRowsToShow))

} finally {
actualDF.rdd.unpersist()
expectedDF.rdd.unpersist()
}
}

assertSmallDataFrameEquality对于小型 DataFrame 比较来说速度更快,我发现它对于我的测试套件来说足够了。

PySpark

这是一个简单的函数,如果 DataFrame 相等则返回 true:

def are_dfs_equal(df1, df2):
if df1.schema != df2.schema:
return False
if df1.collect() != df2.collect():
return False
return True

或简化

def are_dfs_equal(df1, df2): 
return (df1.schema == df2.schema) and (df1.collect() == df2.collect())

您通常会在测试套件中执行 DataFrame 相等比较,并且在比较失败时需要一条描述性错误消息(True/False 返回值在调试时没有多大帮助)。

使用chispa图书馆访问 assert_df_equality返回测试套件工作流程的描述性错误消息的方法。

关于scala - Apache Spark 中的数据帧相等,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/31197353/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com