gpt4 book ai didi

python - 通过 Python 使用 Spark 准备我的大数据

转载 作者:塔克拉玛干 更新时间:2023-11-03 04:41:28 26 4
gpt4 key购买 nike

我的 100m 大小,量化数据:

(1424411938', [3885, 7898])
(3333333333', [3885, 7898])

期望的结果:

(3885, [3333333333, 1424411938])
(7898, [3333333333, 1424411938])

所以我想要的是转换数据,以便我将 3885(例如)与拥有它的所有 data[0] 分组)。这是我在 中所做的:

def prepare(data):
result = []
for point_id, cluster in data:
for index, c in enumerate(cluster):
found = 0
for res in result:
if c == res[0]:
found = 1
if(found == 0):
result.append((c, []))
for res in result:
if c == res[0]:
res[1].append(point_id)
return result

但是当我使用 prepare()mapPartitions()'ed data RDD 时,它似乎只在当前分区,从而返回比预期更大的结果。

例如,如果开头的第一条记录在第一个分区中,第二条记录在第二个分区中,那么我会得到这样的结果:

(3885, [3333333333])
(7898, [3333333333])
(3885, [1424411938])
(7898, [1424411938])

如何修改我的 prepare() 以获得预期的效果?或者,如何处理 prepare() 产生的结果,以便获得所需的结果?


您可能已经从代码中注意到,我根本不关心速度。

这是一种创建数据的方法:

data = []
from random import randint
for i in xrange(0, 10):
data.append((randint(0, 100000000), (randint(0, 16000), randint(0, 16000))))
data = sc.parallelize(data)

最佳答案

您可以使用一组基本的 pyspark 转换来实现这一点。

>>> rdd = sc.parallelize([(1424411938, [3885, 7898]),(3333333333, [3885, 7898])])
>>> r = rdd.flatMap(lambda x: ((a,x[0]) for a in x[1]))

我们使用 flatMapx[1] 中的每个项目设置键值对,并将数据行格式更改为 (a, x [0]),这里的a就是x[1]中的每一项。要更好地理解 flatMap,您可以查看文档。

>>> r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])))

我们只是将所有键值对按键分组,并使用元组函数将可迭代对象转换为元组。

>>> r2.collect()
[(3885, (1424411938, 3333333333)), (7898, (1424411938, 3333333333))]

正如您所说,您可以使用 [:150] 来获得前 150 个元素,我想这是正确的用法:

r2 = r.groupByKey().map(lambda x: (x[0],tuple(x[1])[:150]))

我尽量解释清楚。我希望这会有所帮助。

关于python - 通过 Python 使用 Spark 准备我的大数据,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/39401690/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com