gpt4 book ai didi

scala - HashPartitioner 是如何工作的?

转载 作者:可可西里 更新时间:2023-11-01 15:54:11 30 4
gpt4 key购买 nike

我阅读了 HashPartitioner 的文档.不幸的是,除了 API 调用之外,没有太多解释。我假设 HashPartitioner 根据键的散列对分布式集进行分区。例如,如果我的数据是这样的

(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)

所以分区器会将其放入不同的分区中,相同的键落在同一个分区中。但是我不明白构造函数参数的意义

new HashPartitoner(numPartitions) //What does numPartitions do?

对于上面的数据集,如果我这样做,结果会有什么不同

new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)

那么 HashPartitioner 实际上是如何工作的呢?

最佳答案

好吧,让你的数据集稍微有趣一点:

val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)

我们有六个要素:

rdd.count
Long = 6

没有分区器:

rdd.partitioner
Option[org.apache.spark.Partitioner] = None

和八个分区:

rdd.partitions.length
Int = 8

现在让我们定义一个小助手来计算每个分区的元素数量:

import org.apache.spark.rdd.RDD

def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}

由于我们没有分区器,我们的数据集在分区之间均匀分布 (Default Partitioning Scheme in Spark):

countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)

inital-distribution

现在让我们重新划分我们的数据集:

import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))

由于传递给 HashPartitioner 的参数定义了我们期望一个分区的分区数:

rddOneP.partitions.length
Int = 1

因为我们只有一个分区,所以它包含所有元素:

countByPartition(rddOneP).collect
Array[Int] = Array(6)

hash-partitioner-1

请注意,洗牌后值的顺序是不确定的。

如果我们使用 HashPartitioner(2),方法相同

val rddTwoP = rdd.partitionBy(new HashPartitioner(2))

我们将得到 2 个分区:

rddTwoP.partitions.length
Int = 2

由于 rdd 是按关键数据分区的,数据将不再均匀分布:

countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)

因为有三个键,只有两个不同的 hashCode mod numPartitions 值,这里没有什么意外的:

(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))

只是为了确认以上内容:

rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))

hash-partitioner-2

最后使用 HashPartitioner(7) 我们得到七个分区,三个非空分区,每个分区有 2 个元素:

val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)

hash-partitioner-7

总结和注释

  • HashPartitioner 采用定义分区数的单个参数
  • 使用键的 hash 将值分配给分区。 hash 函数可能因语言而异(Scala RDD 可能使用 hashCodeDataSets 使用 MurmurHash 3、PySpark、portable_hash)。

    在这种简单的情况下,key 是一个小整数,您可以假设 hash 是一个身份 (i = hash(i))。

    Scala API 使用 nonNegativeMod根据计算的哈希值确定分区,

  • 如果 key 分布不均匀,您可能会遇到部分集群空闲的情况

  • 键必须是可哈希的。您可以查看我对 A list as a key for PySpark's reduceByKey 的回答阅读有关 PySpark 特定问题的信息。 HashPartitioner documentation 突出显示了另一个可能的问题。 :

    Java arrays have hashCodes that are based on the arrays' identities rather than their contents, so attempting to partition an RDD[Array[]] or RDD[(Array[], _)] using a HashPartitioner will produce an unexpected or incorrect result.

  • 在 Python 3 中,您必须确保散列是一致的。参见 What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?

  • 哈希分区器既不是单射也不是满射。多个键可以分配给一个分区,一些分区可以保持为空。

  • 请注意,当与 REPL 定义的案例类 (Case class equality in Apache Spark) 结合使用时,当前基于哈希的方法在 Scala 中不起作用。

  • HashPartitioner(或任何其他 Partitioner)打乱数据。除非在多个操作之间重复使用分区,否则它不会减少要混洗的数据量。

关于scala - HashPartitioner 是如何工作的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45997456/

30 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com