gpt4 book ai didi

apache-spark - 按列分区但保持固定分区计数的有效方法是什么?

转载 作者:行者123 更新时间:2023-12-03 07:10:57 25 4
gpt4 key购买 nike

按字段将数据分区为预定义分区计数的最佳方法是什么?

我当前正在通过指定partionCount=600 对数据进行分区。发现计数 600 可为我的数据集/集群设置提供最佳查询性能。

val rawJson = sqlContext.read.json(filename).coalesce(600)
rawJson.write.parquet(filenameParquet)

现在我想按“eventName”列对此数据进行分区,但仍保留计数 600。数据当前有大约 2000 个唯一的 eventName,加上每个 eventName 中的行数并不统一。大约有 10 个 eventName 拥有超过 50% 的数据,导致数据倾斜。因此,如果我像下面这样进行分区,它的性能不是很好。写入所花费的时间比没有写入的时间多 5 倍。

val rawJson = sqlContext.read.json(filename)
rawJson.write.partitionBy("eventName").parquet(filenameParquet)

针对这些场景对数据进行分区的好方法是什么?有没有办法按 eventName 进行分区,但将其分散到 600 个分区?

我的架构如下所示:

{  
"eventName": "name1",
"time": "2016-06-20T11:57:19.4941368-04:00",
"data": {
"type": "EventData",
"dataDetails": {
"name": "detailed1",
"id": "1234",
...
...
}
}
}

谢谢!

最佳答案

这是数据倾斜的常见问题,您可以采取多种方法。

如果倾斜随着时间的推移保持稳定,列表分桶就可以工作,情况可能是也可能不是,特别是在引入分区变量的新值的情况下。我还没有研究过随着时间的推移调整列表存储是多么容易,并且正如您的评论所述,无论如何您都不能使用它,因为它是 Spark 2.0 功能。

如果您使用的是 1.6.x,关键的观察结果是您可以创建自己的函数,将每个事件名称映射到 600 个唯一值之一。您可以将其作为 UDF 或 case 表达式来执行。然后,您只需使用该函数创建一个列,然后使用 repartition(600, 'myPartitionCol)(而不是 coalesce(600))按该列进行分区。

因为我们在 Swoop 处理非常倾斜的数据,我发现以下主力数据结构对于构建分区相关工具非常有用。

/** Given a key, returns a random number in the range [x, y) where
* x and y are the numbers in the tuple associated with a key.
*/
class RandomRangeMap[A](private val m: Map[A, (Int, Int)]) extends Serializable {
private val r = new java.util.Random() // Scala Random is not serializable in 2.10

def apply(key: A): Int = {
val (start, end) = m(key)
start + r.nextInt(end - start)
}

override def toString = s"RandomRangeMap($r, $m)"
}
例如,以下是我们如何针对稍微不同的情况构建分区器:数据倾斜且键的数量很少,因此我们必须增加倾斜键的分区数量,同时坚持使用 1 作为每个键的最小分区数:

/** Partitions data such that each unique key ends in P(key) partitions.
* Must be instantiated with a sequence of unique keys and their Ps.
* Partition sizes can be highly-skewed by the data, which is where the
* multiples come in.
*
* @param keyMap maps key values to their partition multiples
*/
class ByKeyPartitionerWithMultiples(val keyMap: Map[Any, Int]) extends Partitioner {
private val rrm = new RandomRangeMap(
keyMap.keys
.zip(
keyMap.values
.scanLeft(0)(_+_)
.zip(keyMap.values)
.map {
case (start, count) => (start, start + count)
}
)
.toMap
)

override val numPartitions =
keyMap.values.sum

override def getPartition(key: Any): Int =
rrm(key)
}

object ByKeyPartitionerWithMultiples {

/** Builds a UDF with a ByKeyPartitionerWithMultiples in a closure.
*
* @param keyMap maps key values to their partition multiples
*/
def udf(keyMap: Map[String, Int]) = {
val partitioner = new ByKeyPartitionerWithMultiples(keyMap.asInstanceOf[Map[Any, Int]])
(key:String) => partitioner.getPartition(key)
}

}

就您而言,您必须将多个事件名称合并到一个分区中,这需要进行更改,但我希望上面的代码能让您了解如何解决该问题。

最后一个观察结果是,如果随着时间的推移,数据中事件名称的分布值很大,您可以对数据的某些部分执行统计收集以计算映射表。您不必一直这样做,只在需要时才这样做。要确定这一点,您可以查看每个分区中的行数和/或输出文件的大小。换句话说,整个过程可以作为 Spark 作业的一部分实现自动化。

关于apache-spark - 按列分区但保持固定分区计数的有效方法是什么?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38670369/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com