gpt4 book ai didi

python - Pyspark:将 repartitionAndSortWithinPartitions 与多个排序条件一起使用

转载 作者:太空狗 更新时间:2023-10-30 02:02:46 31 4
gpt4 key购买 nike

假设我有以下 RDD:

rdd = sc.parallelize([('a', (5,1)), ('d', (8,2)), ('2', (6,3)), ('a', (8,2)), ('d', (9,6)), ('b', (3,4)),('c', (8,3))])

如何使用 repartitionAndSortWithinPartitions 并按 x[0] 和 x[1][0] 排序。使用以下我仅按键(x[0])排序:

Npartitions = sc.defaultParallelism
rdd2 = rdd.repartitionAndSortWithinPartitions(2, lambda x: hash(x) % Npartitions, 2)

一种方法如下,但我想应该有更简单的方法:

Npartitions = sc.defaultParallelism 
partitioned_data = rdd
.partitionBy(2)
.map(lambda x:(x[0],x[1][0],x[1][1]))
.toDF(['letter','number2','number3'])
.sortWithinPartitions(['letter','number2'],ascending=False)
.map(lambda x:(x.letter,(x.number2,x.number3)))

>>> partitioned_data.glom().collect()

[[],
[(u'd', (9, 6)), (u'd', (8, 2))],
[(u'c', (8, 3)), (u'c', (6, 3))],
[(u'b', (3, 4))],
[(u'a', (8, 2)), (u'a', (5, 1))]

可以看出,我必须将其转换为 Dataframe 才能使用 sortWithinPartitions。还有别的办法吗?使用 repartitionAndSortWIthinPartitions

(数据没有全局排序没关系,我只关心分区内的排序。)

最佳答案

这是可能的,但您必须在复合键中包含所有必需的信息:

from pyspark.rdd import portable_hash

n = 2

def partitioner(n):
"""Partition by the first item in the key tuple"""
def partitioner_(x):
return portable_hash(x[0]) % n
return partitioner_


(rdd
.keyBy(lambda kv: (kv[0], kv[1][0])) # Create temporary composite key
.repartitionAndSortWithinPartitions(
numPartitions=n, partitionFunc=partitioner(n), ascending=False)
.map(lambda x: x[1])) # Drop key (note: there is no partitioner set anymore)

逐步解释:

  • keyBy(lambda kv: (kv[0], kv[1][0]))创建一个替代键,它由原始键和值的第一个元素组成。换句话说,它转换:

    (0, (5,1))

    进入

    ((0, 5), (0, (5, 1)))

    在实践中,简单地将数据 reshape 为

    ((0, 5), 1)
  • partitioner根据键的第一个元素的散列定义分区函数,因此:

    partitioner(7)((0, 5))
    ## 0

    partitioner(7)((0, 6))
    ## 0

    partitioner(7)((0, 99))
    ## 0

    partitioner(7)((3, 99))
    ## 3

    如您所见,它是一致的并且忽略了第二位。

  • 我们使用默认 keyfunc函数是身份 ( lambda x: x ) 并依赖于 Python tuple 上定义的词典顺序:

    (0, 5) < (1, 5)
    ## True

    (0, 5) < (0, 4)
    ## False

如前所述,您可以改为 reshape 数据:

rdd.map(lambda kv: ((kv[0], kv[1][0]), kv[1][1]))

并删除最后的 map以提高性能。

关于python - Pyspark:将 repartitionAndSortWithinPartitions 与多个排序条件一起使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/38918342/

31 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com