gpt4 book ai didi

python - 如何使用 pyspark.resultiterable.ResultIterable 对象

转载 作者:行者123 更新时间:2023-12-02 02:56:01 24 4
gpt4 key购买 nike

我有 1TB 的记录结构在一对 rdd 中,我想按键对我的所有记录进行分组,然后只对值应用一个函数。

我的代码如下:

rdd = sc.textFile("path").map(lambdal:l.split(";"))
rdd_pair=rdd.map(lambda a: (a[0], a))
rdd_pair.take(3)
#output: [('id_client', ('id_client','time','city')]
#[('1', [('1', '2013/03/12 23:59:59', 'London')]
#[('1', [('1', '2013/12/03 10:43:12', 'Rome')]
#[('1', [('1', '2013/05/01 00:09:59', 'Madrid')]

我想按 id_client 对所有记录进行分组,然后仅将函数矩阵应用于值。对于每个键,该函数按“时间”对元组列表进行排序,然后该函数提取从一个城市到另一个城市的过渡。

grouped=rdd_pair.groupByKey(200)
grouped.take(1)
#output [("1",<pyspark.resultiterable.ResultIterable object at 0x7fc659e0a210)]

def matrix(input):
output=[]
input_bag= sorted(input, key=lambda x: x[1], reverse=False)
loc0 = input_bag[0]
for loc in input_bag[1:]:
output.append((loc0[2],loc[2]))
loc0 = loc
return output

transition=grouped.mapValues(lambda k: matrix(k)).filter(lambda l: l[1]!=[])

我想要的输出是:

#output transition: [('1', [('London', 'Madrid'),('Madrid', 'Rome')])]

我收到 Python 错误:列表索引超出范围错误

有人可以帮助我吗?谢谢

最佳答案

我是这样解决的:

def matrix(input):
output=[]
input2=[i[0] for i in input]
input_bag= sorted(input2, key=lambda x: x[1], reverse=False)
loc0 = input_bag[0]
for loc in input_bag[1:]:
output.append((loc0[2],loc[2]))
loc0 = loc
return output

在使用 Python 内置函数“sorted”之前,我将 input(一个可迭代对象)转换为 input2(一个元组列表)

关于python - 如何使用 pyspark.resultiterable.ResultIterable 对象,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/49428524/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com