gpt4 book ai didi

python - 在 pyspark 中,为什么 `limit` 后跟 `repartition` 创建完全相等的分区大小?

转载 作者:太空狗 更新时间:2023-10-30 01:10:57 24 4
gpt4 key购买 nike

根据pyspark documentation , repartition 应该使用散列分区,这会给出稍微不相等的分区大小。但是,我发现通过在它前面加上 limit,它会产生 完全 相等的分区大小。这可以通过在 pyspark shell 中运行以下命令来显示:

df = spark.createDataFrame([range(5)] * 100)

def count_part_size(part_iter):
yield len(list(part_iter))

print(df.repartition(20).rdd.mapPartitions(count_part_size).collect())
# [4, 4, 4, 5, 4, 4, 5, 4, 5, 6, 6, 6, 7, 5, 5, 5, 5, 6, 5, 5]

print(df.limit(100).repartition(20).rdd.mapPartitions(count_part_size).collect())
# [5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]

如果 repartition 使用散列分区程序,为什么在这种情况下它会生成完全相等的分区大小?如果它不使用散列分区器,它使用的是哪种分区器?

顺便说一句,我使用的是python 2.7.15版本和spark 2.0.2版本

最佳答案

这里有四个因素:

  • 如果不提供分区表达式,则repartition不使用HashPartitioning,或者说,不直接使用。相反,它使用 RoundRobinPartitioningwhich (正如您可能猜到的那样)

    Distributes elements evenly across output partitions, starting from a random partition.

    在内部,它在每个分区上生成一个 scala.Int 序列,starting from a random point .只有这些值通过 HashPartitioner 传递。

  • 之所以这样工作是因为 Int hashCode 只是身份 - 换句话说

    ∀x∈Int x = hashCode(x)

    (顺便说一句,这与 Scala Int 范围内的 CPython hash 的行为相同 - -2147483648 到 2147483647。这些哈希的设计根本不是为了加密安全)作为将 HashPartitioner 应用于一系列 Int 值的结果会导致实际的循环分配。

    所以在这种情况下,HashPartitioner 只是作为模运算符工作。

  • 您在重新分区之前应用了 LIMIT,因此所有值都首先洗牌到单个节点。因此,只使用了一个 Int 值序列。

  • 分区数是数据集大小的除数。由于数据可以在分区之间均匀分布。

总体而言,它是预期行为(每个分区应在输出分区之间均匀分布)、管道属性(只有一个输入分区)和数据(数据集可以均匀分布)的组合。

关于python - 在 pyspark 中,为什么 `limit` 后跟 `repartition` 创建完全相等的分区大小?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54835117/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com