gpt4 book ai didi

apache-spark - 如何在 PySpark DataFrame 中强制进行特定分区?

转载 作者:行者123 更新时间:2023-12-04 22:39:55 27 4
gpt4 key购买 nike

编辑 2022/02/18:几年后我又回到了这个问题,我相信我下面的新解决方案比目前投票率最高的解决方案性能要好得多。
假设我有一个带有 partition_id 列的 DataFrame :

n_partitions = 2

df = spark.sparkContext.parallelize([
[1, 'A'],
[1, 'B'],
[2, 'A'],
[2, 'C']
]).toDF(('partition_id', 'val'))
如何重新分区 DataFrame 以保证 partition_id 的每个值转到单独的分区,并且实际分区的数量与 partition_id 的不同值完全相同?
如果我做一个哈希分区,即 df.repartition(n_partitions, 'partition_id') ,这保证了正确的分区数,但有些分区可能是空的,而其他分区可能包含多个 partition_id 的值由于哈希冲突。

最佳答案

Python 和 DataFrame 没有这样的选项API。 Dataset 中的分区 API不可插入,仅支持预定义的 range and hash partitioning schemes .

您可以将数据转换为 RDD ,使用自定义分区器进行分区,然后读取转换回 DataFrame :

from pyspark.sql.functions import col, struct, spark_partition_id

mapping = {k: i for i, k in enumerate(
df.select("partition_id").distinct().rdd.flatMap(lambda x: x).collect()
)}

result = (df
.select("partition_id", struct([c for c in df.columns]))
.rdd.partitionBy(len(mapping), lambda k: mapping[k])
.values()
.toDF(df.schema))

result.withColumn("actual_partition_id", spark_partition_id()).show()
# +------------+---+-------------------+
# |partition_id|val|actual_partition_id|
# +------------+---+-------------------+
# | 1| A| 0|
# | 1| B| 0|
# | 2| A| 1|
# | 2| C| 1|
# +------------+---+-------------------+

请记住,这只会创建特定的数据分布,不会设置 Catalyst 优化器可以使用的分区器。

关于apache-spark - 如何在 PySpark DataFrame 中强制进行特定分区?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/50757050/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com