gpt4 book ai didi

sql - 如何获得 Spark RDD 的 SQL row_number 等价物?

转载 作者:行者123 更新时间:2023-12-03 10:41:00 44 4
gpt4 key购买 nike

我需要为具有多列的数据表生成完整的 row_numbers 列表。

在 SQL 中,这看起来像这样:

select
key_value,
col1,
col2,
col3,
row_number() over (partition by key_value order by col1, col2 desc, col3)
from
temp
;

现在,让我们在 Spark 中说我有一个形式为 (K, V) 的 RDD,其中 V=(col1, col2, col3),所以我的条目就像
(key1, (1,2,3))
(key1, (1,4,7))
(key1, (2,2,3))
(key2, (5,5,5))
(key2, (5,5,9))
(key2, (7,5,5))
etc.

我想使用 sortBy()、sortWith()、sortByKey()、zipWithIndex 等命令对这些命令进行排序,并拥有一个具有正确 row_number 的新 RDD
(key1, (1,2,3), 2)
(key1, (1,4,7), 1)
(key1, (2,2,3), 3)
(key2, (5,5,5), 1)
(key2, (5,5,9), 2)
(key2, (7,5,5), 3)
etc.

(我不关心括号,所以形式也可以是 (K, (col1,col2,col3,rownum)))

我该怎么做呢?

这是我的第一次尝试:
val sample_data = Seq(((3,4),5,5,5),((3,4),5,5,9),((3,4),7,5,5),((1,2),1,2,3),((1,2),1,4,7),((1,2),2,2,3))

val temp1 = sc.parallelize(sample_data)

temp1.collect().foreach(println)

// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)
// ((1,2),1,2,3)
// ((1,2),1,4,7)
// ((1,2),2,2,3)

temp1.map(x => (x, 1)).sortByKey().zipWithIndex.collect().foreach(println)

// ((((1,2),1,2,3),1),0)
// ((((1,2),1,4,7),1),1)
// ((((1,2),2,2,3),1),2)
// ((((3,4),5,5,5),1),3)
// ((((3,4),5,5,9),1),4)
// ((((3,4),7,5,5),1),5)

// note that this isn't ordering with a partition on key value K!

val temp2 = temp1.???

还要注意,函数 sortBy 不能直接应用于 RDD,但必须先运行 collect(),然后输出也不是 RDD,而是数组
temp1.collect().sortBy(a => a._2 -> -a._3 -> a._4).foreach(println)

// ((1,2),1,4,7)
// ((1,2),1,2,3)
// ((1,2),2,2,3)
// ((3,4),5,5,5)
// ((3,4),5,5,9)
// ((3,4),7,5,5)

这里有更多的进展,但仍然没有分区:
val temp2 = sc.parallelize(temp1.map(a => (a._1,(a._2, a._3, a._4))).collect().sortBy(a => a._2._1 -> -a._2._2 -> a._2._3)).zipWithIndex.map(a => (a._1._1, a._1._2._1, a._1._2._2, a._1._2._3, a._2 + 1))

temp2.collect().foreach(println)

// ((1,2),1,4,7,1)
// ((1,2),1,2,3,2)
// ((1,2),2,2,3,3)
// ((3,4),5,5,5,4)
// ((3,4),5,5,9,5)
// ((3,4),7,5,5,6)

最佳答案

row_number() over (partition by ... order by ...)功能已添加到 Spark 1.4。这个答案使用 PySpark/DataFrames。

创建一个测试数据帧:

from pyspark.sql import Row, functions as F

testDF = sc.parallelize(
(Row(k="key1", v=(1,2,3)),
Row(k="key1", v=(1,4,7)),
Row(k="key1", v=(2,2,3)),
Row(k="key2", v=(5,5,5)),
Row(k="key2", v=(5,5,9)),
Row(k="key2", v=(7,5,5))
)
).toDF()

添加分区行号:
from pyspark.sql.window import Window

(testDF
.select("k", "v",
F.rowNumber()
.over(Window
.partitionBy("k")
.orderBy("k")
)
.alias("rowNum")
)
.show()
)

+----+-------+------+
| k| v|rowNum|
+----+-------+------+
|key1|[1,2,3]| 1|
|key1|[1,4,7]| 2|
|key1|[2,2,3]| 3|
|key2|[5,5,5]| 1|
|key2|[5,5,9]| 2|
|key2|[7,5,5]| 3|
+----+-------+------+

关于sql - 如何获得 Spark RDD 的 SQL row_number 等价物?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/27050247/

44 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com