gpt4 book ai didi

scala - Spark Scala : Aggregate DataFrame Column Values into a Ordered List

转载 作者:行者123 更新时间:2023-12-05 00:54:11 24 4
gpt4 key购买 nike

我有一个 spark scala DataFrame,它有四个值:(id、day、val、order)。我想创建一个带有列的新 DataFrame: (id, day, value_list: List(val1, val2, ..., valn)) 其中 val1, 到 valn 按 asc 顺序值排序。

例如:

(50, 113, 1, 1), 
(50, 113, 1, 3),
(50, 113, 2, 2),
(51, 114, 1, 2),
(51, 114, 2, 1),
(51, 113, 1, 1)

会成为:
((51,113),List(1))
((51,114),List(2, 1)
((50,113),List(1, 2, 1))

我很接近,但在将数据汇总到列表中后不知道该怎么做。我不知道如何通过顺序 int 对每个值列表进行 Spark 排序:
import org.apache.spark.sql.Row

val testList = List((50, 113, 1, 1), (50, 113, 1, 3), (50, 113, 2, 2), (51, 114, 1, 2), (51, 114, 2, 1), (51, 113, 1, 1))
val testDF = sqlContext.sparkContext.parallelize(testList).toDF("id1", "id2", "val", "order")

val rDD1 = testDF.map{case Row(key1: Int, key2: Int, val1: Int, val2: Int) => ((key1, key2), List((val1, val2)))}
val rDD2 = rDD1.reduceByKey{case (x, y) => x ++ y}

输出如下所示:
((51,113),List((1,1)))
((51,114),List((1,2), (2,1)))
((50,113),List((1,3), (1,1), (2,2)))

下一步是生产:
((51,113),List((1,1)))
((51,114),List((2,1), (1,2)))
((50,113),List((1,1), (2,2), (1,3)))

最佳答案

您只需要映射您的 RDD并使用 sortBy :

scala> val df = Seq((50, 113, 1, 1), (50, 113, 1, 3), (50, 113, 2, 2), (51, 114, 1, 2), (51, 114, 2, 1), (51, 113, 1, 1)).toDF("id1", "id2", "val", "order")
df: org.apache.spark.sql.DataFrame = [id1: int, id2: int, val: int, order: int]

scala> import org.apache.spark.sql.Row
import org.apache.spark.sql.Row

scala> val rDD1 = df.map{case Row(key1: Int, key2: Int, val1: Int, val2: Int) => ((key1, key2), List((val1, val2)))}
rDD1: org.apache.spark.rdd.RDD[((Int, Int), List[(Int, Int)])] = MapPartitionsRDD[10] at map at <console>:28

scala> val rDD2 = rDD1.reduceByKey{case (x, y) => x ++ y}
rDD2: org.apache.spark.rdd.RDD[((Int, Int), List[(Int, Int)])] = ShuffledRDD[11] at reduceByKey at <console>:30

scala> val rDD3 = rDD2.map(x => (x._1, x._2.sortBy(_._2)))
rDD3: org.apache.spark.rdd.RDD[((Int, Int), List[(Int, Int)])] = MapPartitionsRDD[12] at map at <console>:32

scala> rDD3.collect.foreach(println)
((51,113),List((1,1)))
((50,113),List((1,1), (2,2), (1,3)))
((51,114),List((2,1), (1,2)))

关于scala - Spark Scala : Aggregate DataFrame Column Values into a Ordered List,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/40295107/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com