作者热门文章
- c - 在位数组中找到第一个零
- linux - Unix 显示有关匹配两种模式之一的文件的信息
- 正则表达式替换多个文件
- linux - 隐藏来自 xtrace 的命令
我有一堆以复合键和值的形式存在的元组。例如,
tfile.collect() = [(('id1','pd1','t1'),5.0),
(('id2','pd2','t2'),6.0),
(('id1','pd1','t2'),7.5),
(('id1','pd1','t3'),8.1) ]
我想对这个集合执行类似于 sql 的操作,我可以在其中根据 id[1..n] 或 pd[1..n] 聚合信息。我想使用 vanilla pyspark api 来实现,而不是使用 SQLContext。在我当前的实现中,我正在读取一堆文件并合并 RDD。
def readfile():
fr = range(6,23)
tfile = sc.union([sc.textFile(basepath+str(f)+".txt")
.map(lambda view: set_feature(view,f))
.reduceByKey(lambda a, b: a+b)
for f in fr])
return tfile
我打算创建一个聚合数组作为值。例如,
agg_tfile = [((id1,pd1),[5.0,7.5,8.1])]
其中 5.0,7.5,8.1 代表 [t1,t2,t3] 。我目前正在使用字典通过 Vanilla python 代码实现相同的目标。它适用于较小的数据集。但我担心这可能无法扩展到更大的数据集。是否有使用 pyspark apis 实现相同目标的有效方法?
最佳答案
我的猜测是你想根据多个字段转置数据。
一种简单的方法是连接您将作为分组依据的目标字段,并使其成为成对 RDD 中的键。例如:
lines = sc.parallelize(['id1,pd1,t1,5.0', 'id2,pd2,t2,6.0', 'id1,pd1,t2,7.5', 'id1,pd1,t3,8.1'])
rdd = lines.map(lambda x: x.split(',')).map(lambda x: (x[0] + ', ' + x[1], x[3])).reduceByKey(lambda a, b: a + ', ' + b)
print rdd.collect()
然后你会得到转置后的结果。
[('id1, pd1', '5.0, 7.5, 8.1'), ('id2, pd2', '6.0')]
关于python - 如何在spark中按多个键分组?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/29372792/
我是一名优秀的程序员,十分优秀!