gpt4 book ai didi

apache-spark - RDD/Dataframe 的分区位置

转载 作者:行者123 更新时间:2023-12-04 05:16:41 25 4
gpt4 key购买 nike

我有一个(相当大,想想 10e7 行)DataFrame,我从中根据某些属性过滤元素

val res = data.filter(data(FieldNames.myValue) === 2).select(pk.name, FieldName.myValue) 

我的 DataFrame 有 n 个分区 data.rdd.getNumPartitions

现在我想知道我的行来自哪个分区。我知道我可以用这样的东西遍历所有分区

val temp = res.first() //or foreach, this is just an example
data.foreachPartition(f => {
f.exists(row => row.get(0)==temp.get(0))
//my code here
}) //compare PKs

data.rdd.mapPartitionsWithIndex((idx, f) => ...)

但是,如果我的结果和我的 DataFrame 变大,这似乎过分而且性能也不是很好。

在我执行 filter() 操作后,是否有 Spark 方法来执行此操作?

或者,是否有一种方法可以重写/替代 filter() 语句,以便它返回行的来源?

我也可以将分区位置保存在我的 DataFrame 中并在重新分区时更新它,但我宁愿以 Spark 方式进行

(我发现的唯一类似问题是 here,问题和评论都不是很有帮助。我还发现 this 可能相似但不相同)

在此先感谢您的帮助/指点,如果我错过了一个与我的问题类似但已经得到回答的问题,我深表歉意。

最佳答案

分区数/计数不稳定,因为 Spark 会在分区中执行自动扩展和减少。例如,这意味着输入分区计数可能与输入文件计数不同。

这些情况下的一般模式是根据每个输入文件中的数据创建某种类型的复合键。如果 key 很大,您可以对其进行散列以减小大小。如果您不太关心碰撞,请使用 Murmur3。如果您担心冲突,请使用 MD5,它仍然非常快。

如果您拥有的唯一独特特征是输入文件的路径,则必须将文件路径添加为区分列。这是一种方法:

val paths = Seq(...)
val df = paths
.map { path =>
sqlContext.read.parquet(path)
.withColumn("path", lit(path))
}
.reduceLeft(_ unionAll _)

想法很简单:一次读取一个输入文件,添加一个与它们关联的唯一列,然后使用UNION ALL 将它们组合在一起。

关于apache-spark - RDD/Dataframe 的分区位置,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38522540/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com