gpt4 book ai didi

python - Spark 数据帧 : Skewed Partition after Join

转载 作者:太空宇宙 更新时间:2023-11-03 10:55:52 25 4
gpt4 key购买 nike

我有两个数据框,df1 有 2200 万条记录,df2 有 200 万条记录。我在 email_address 上做正确的加入作为键。

test_join = df2.join(df1, "email_address", how = 'right').cache()

两个数据框中的重复邮件(如果有的话)很少。加入后,我试图找到结果数据帧的分区大小 test_join,使用以下代码:

l = builder.rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
print(max(l,key=lambda item:item[1]),min(l,key=lambda item:item[1]))

结果显示最大的分区大约是平均分区大小的 100 倍。分区大小的这种倾斜给后连接转换和操作带来了性能问题。

我知道我可以在加入后使用 repartion(num_partitions) 命令对它进行同样的重新分区,但我的问题是为什么我会遇到这种不均匀的分区结果,有什么办法可以避免它首先。

P.S:只是为了检查问题是否仅与 email_address 哈希函数有关,我还检查了其他几个连接的分区大小,我还在数字键连接中看到了问题。

最佳答案

@user6910411 你说对了。问题出在我的数据上,遵循一些愚蠢的约定来输入空电子邮件,这导致了这个倾斜键问题。

在检查最大分区中的条目后,我开始知道那里发生了什么。我发现这种调试技术非常有用,我相信这可以帮助面临同样问题的其他人。

顺便说一句,这是我写的函数,用于查找 RDD 分区的偏度:

from itertools import islice
def check_skewness(df):
sampled_rdd = df.sample(False,0.01).rdd.cache() # Taking just 1% sample, to make processing fast
l = sampled_rdd.mapPartitionsWithIndex(lambda x,it: [(x,sum(1 for _ in it))]).collect()
max_part = max(l,key=lambda item:item[1])
min_part = min(l,key=lambda item:item[1])
if max_part[1]/min_part[1] > 5: #if difference between largest and smallest partition size is greater than 5 times
print 'Partitions Skewed: Largest Partition',max_part,'Smallest Partition',min_part,'\nSample Content of the largest Partition: \n'
print (sampled_rdd.mapPartitionsWithIndex(lambda i, it: islice(it, 0, 5) if i == max_part[0] else []).take(5))
else:
print 'No Skewness: Largest Partition',max_part,'Smallest Partition',min_part

然后我只传递要检查偏度的数据框,如下所示:

check_skewness(test_join)

它为我提供了有关其偏度的有用信息。

关于python - Spark 数据帧 : Skewed Partition after Join,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41094147/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com