- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我阅读了 HashPartitioner
的文档.不幸的是,除了 API 调用之外,没有太多解释。我假设 HashPartitioner
根据键的散列对分布式集进行分区。例如,如果我的数据是这样的
(1,1), (1,2), (1,3), (2,1), (2,2), (2,3)
所以分区器会将其放入不同的分区中,相同的键落在同一个分区中。但是我不明白构造函数参数的意义
new HashPartitoner(numPartitions) //What does numPartitions do?
对于上面的数据集,如果我这样做,结果会有什么不同
new HashPartitoner(1)
new HashPartitoner(2)
new HashPartitoner(10)
那么 HashPartitioner
实际上是如何工作的呢?
最佳答案
好吧,让你的数据集稍微有趣一点:
val rdd = sc.parallelize(for {
x <- 1 to 3
y <- 1 to 2
} yield (x, None), 8)
我们有六个要素:
rdd.count
Long = 6
没有分区器:
rdd.partitioner
Option[org.apache.spark.Partitioner] = None
和八个分区:
rdd.partitions.length
Int = 8
现在让我们定义一个小助手来计算每个分区的元素数量:
import org.apache.spark.rdd.RDD
def countByPartition(rdd: RDD[(Int, None.type)]) = {
rdd.mapPartitions(iter => Iterator(iter.length))
}
由于我们没有分区器,我们的数据集在分区之间均匀分布 (Default Partitioning Scheme in Spark):
countByPartition(rdd).collect()
Array[Int] = Array(0, 1, 1, 1, 0, 1, 1, 1)
现在让我们重新划分我们的数据集:
import org.apache.spark.HashPartitioner
val rddOneP = rdd.partitionBy(new HashPartitioner(1))
由于传递给 HashPartitioner
的参数定义了我们期望一个分区的分区数:
rddOneP.partitions.length
Int = 1
因为我们只有一个分区,所以它包含所有元素:
countByPartition(rddOneP).collect
Array[Int] = Array(6)
请注意,洗牌后值的顺序是不确定的。
如果我们使用 HashPartitioner(2)
,方法相同
val rddTwoP = rdd.partitionBy(new HashPartitioner(2))
我们将得到 2 个分区:
rddTwoP.partitions.length
Int = 2
由于 rdd
是按关键数据分区的,数据将不再均匀分布:
countByPartition(rddTwoP).collect()
Array[Int] = Array(2, 4)
因为有三个键,只有两个不同的 hashCode
mod numPartitions
值,这里没有什么意外的:
(1 to 3).map((k: Int) => (k, k.hashCode, k.hashCode % 2))
scala.collection.immutable.IndexedSeq[(Int, Int, Int)] = Vector((1,1,1), (2,2,0), (3,3,1))
只是为了确认以上内容:
rddTwoP.mapPartitions(iter => Iterator(iter.map(_._1).toSet)).collect()
Array[scala.collection.immutable.Set[Int]] = Array(Set(2), Set(1, 3))
最后使用 HashPartitioner(7)
我们得到七个分区,三个非空分区,每个分区有 2 个元素:
val rddSevenP = rdd.partitionBy(new HashPartitioner(7))
rddSevenP.partitions.length
Int = 7
countByPartition(rddTenP).collect()
Array[Int] = Array(0, 2, 2, 2, 0, 0, 0)
HashPartitioner
采用定义分区数的单个参数使用键的 hash
将值分配给分区。 hash
函数可能因语言而异(Scala RDD 可能使用 hashCode
,DataSets
使用 MurmurHash 3、PySpark、portable_hash
)。
在这种简单的情况下,key 是一个小整数,您可以假设 hash
是一个身份 (i = hash(i)
)。
Scala API 使用 nonNegativeMod
根据计算的哈希值确定分区,
如果 key 分布不均匀,您可能会遇到部分集群空闲的情况
键必须是可哈希的。您可以查看我对 A list as a key for PySpark's reduceByKey 的回答阅读有关 PySpark 特定问题的信息。 HashPartitioner documentation 突出显示了另一个可能的问题。 :
Java arrays have hashCodes that are based on the arrays' identities rather than their contents, so attempting to partition an RDD[Array[]] or RDD[(Array[], _)] using a HashPartitioner will produce an unexpected or incorrect result.
在 Python 3 中,您必须确保散列是一致的。参见 What does Exception: Randomness of hash of string should be disabled via PYTHONHASHSEED mean in pyspark?
哈希分区器既不是单射也不是满射。多个键可以分配给一个分区,一些分区可以保持为空。
请注意,当与 REPL 定义的案例类 (Case class equality in Apache Spark) 结合使用时,当前基于哈希的方法在 Scala 中不起作用。
HashPartitioner
(或任何其他 Partitioner
)打乱数据。除非在多个操作之间重复使用分区,否则它不会减少要混洗的数据量。
关于scala - HashPartitioner 是如何工作的?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/45997456/
我有一些 Scala 代码,它用两个不同版本的类型参数化函数做了一些漂亮的事情。我已经从我的应用程序中简化了很多,但最后我的代码充满了形式 w(f[Int],f[Double]) 的调用。哪里w()是
如果我在同一目录中有两个单独的未编译的 scala 文件: // hello.scala object hello { def world() = println("hello world") }
val schema = df.schema val x = df.flatMap(r => (0 until schema.length).map { idx => ((idx, r.g
环境: Play 2.3.0/Scala 2.11.1/IntelliJ 13.1 我使用 Typesafe Activator 1.2.1 用 Scala 2.11.1 创建一个新项目。项目创建好后
我只是想知道如何使用我自己的类扩展 Scala 控制台和“脚本”运行程序,以便我可以通过使用实际的 Scala 语言与其通信来实际使用我的代码?我应将 jar 放在哪里,以便无需临时配置即可从每个 S
我已经根据 README.md 文件安装了 ensime,但是,我在低级 ensime-server 缓冲区中出现以下错误: 信息: fatal error :scala.tools.nsc.Miss
我正在阅读《Scala 编程》一书。在书中,它说“一个函数文字被编译成一个类,当在运行时实例化时它是一个函数值”。并且它提到“函数值是对象,因此您可以根据需要将它们存储在变量中”。 所以我尝试检查函数
我有 hello world scala native 应用程序,想对此应用程序运行小型 scala 测试我使用通常的测试命令,但它抛出异常: NativeMain.scala object Nati
有few resources在网络上,在编写与代码模式匹配的 Scala 编译器插件方面很有指导意义,但这些对生成代码(构建符号树)没有帮助。我应该从哪里开始弄清楚如何做到这一点? (如果有比手动构建
我是 Scala 的新手。但是,我用 创建了一个中等大小的程序。斯卡拉 2.9.0 .现在我想使用一个仅适用于 的开源库斯卡拉 2.7.7 . 是吗可能 在我的 Scala 2.9.0 程序中使用这个
有没有办法在 Scala 2.11 中使用 scala-pickling? 我在 sonatype 存储库中尝试了唯一的 scala-pickling_2.11 工件,但它似乎不起作用。我收到消息:
这与命令行编译器选项无关。如何以编程方式获取代码内的 Scala 版本? 或者,Eclipse Scala 插件 v2 在哪里存储 scalac 的路径? 最佳答案 这无需访问 scala-compi
我正在阅读《Scala 编程》一书,并在第 6 章中的类 Rational 实现中遇到了一些问题。 这是我的 Rational 类的初始版本(基于本书) class Rational(numerato
我是 Scala 新手,我正在尝试开发一个使用自定义库的小项目。我在库内创建了一个mysql连接池。这是我的库的build.sbt organization := "com.learn" name :
我正在尝试运行一些 Scala 代码,只是暂时打印出“Hello”,但我希望在 SBT 项目中编译 Scala 代码之前运行 Scala 代码。我发现在 build.sbt 中有以下工作。 compi
Here链接到 maven Scala 插件使用。但没有提到它使用的究竟是什么 Scala 版本。我创建了具有以下配置的 Maven Scala 项目: org.scala-tools
我对 Scala 还很陌生,请多多包涵。我有一堆包裹在一个大数组中的 future 。 future 已经完成了查看几 TB 数据的辛勤工作,在我的应用程序结束时,我想总结上述 future 的所有结
我有一个 scala 宏,它依赖于通过包含其位置的静态字符串指定的任意 xml 文件。 def myMacro(path: String) = macro myMacroImpl def myMacr
这是我的功能: def sumOfSquaresOfOdd(in: Seq[Int]): Int = { in.filter(_%2==1).map(_*_).reduce(_+_) } 为什么我
这个问题在这里已经有了答案: Calculating the difference between two Java date instances (45 个答案) 关闭 5 年前。 所以我有一个这
我是一名优秀的程序员,十分优秀!