gpt4 book ai didi

Python Spark 如何使用 RDD API 按组查找累计和

转载 作者:太空宇宙 更新时间:2023-11-04 02:54:19 25 4
gpt4 key购买 nike

我是 spark 编程的新手。需要有关 spark python 程序的帮助,我在其中输入了这样的数据,并希望获得每个组的累积摘要。如果有人指导我,我将不胜感激。

输入数据:

11,1,1,100

11,1,2,150

12,1,1,50

12,2,1,70

12,2,2,20

需要这样的输出数据:

11,1,1,100

11,1,2,250//(100+150)

12,1,1,50

12,2,1,70

12,2,2,90//(70+20)

我试过的代码:

def parseline(line):
fields = line.split(",")
f1 = float(fields[0])
f2 = float(fields[1])
f3 = float(fields[2])
f4 = float(fields[3])
return (f1, f2, f3, f4)

input = sc.textFile("FIle:///...../a.dat")
line = input.map(parseline)
linesorted = line.sortBy(lambda x: (x[0], x[1], x[2]))
runningpremium = linesorted.map(lambda y: (((y[0], y[1]), y[3])).reduceByKey(lambda accum, num: accum + num)

for i in runningpremium.collect():
print i

最佳答案

如评论中所述,您可以在此处使用窗口函数在 Spark Dataframe 上进行累加和。首先,我们可以创建一个带有虚拟列 'a', 'b', 'c', 'd'

的示例数据框
ls = [(11,1,1,100), (11,1,2,150), (12,1,1,50), (12,2,1,70), (12,2,2,20)]
ls_rdd = spark.sparkContext.parallelize(ls)
df = spark.createDataFrame(ls_rdd, schema=['a', 'b', 'c', 'd'])

您可以按 ab 列进行分区,然后按 c 列排序。然后,在末尾的 d 列上应用 sum 函数

from pyspark.sql.window import Window
import pyspark.sql.functions as func

w = Window.partitionBy([df['a'], df['b']]).orderBy(df['c'].asc())
df_cumsum = df.select('a', 'b', 'c', func.sum(df.d).over(w).alias('cum_sum'))
df_cumsum.sort(['a', 'b', 'c']).show() # simple sort column

输出

+---+---+---+-------+
| a| b| c|cum_sum|
+---+---+---+-------+
| 11| 1| 1| 100|
| 11| 1| 2| 250|
| 12| 1| 1| 50|
| 12| 2| 1| 70|
| 12| 2| 2| 90|
+---+---+---+-------+

关于Python Spark 如何使用 RDD API 按组查找累计和,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42909218/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com