gpt4 book ai didi

java - 带有 Java8 的 Spark 2.3 将行转换为列

转载 作者:行者123 更新时间:2023-11-29 04:06:46 25 4
gpt4 key购买 nike

我是使用 Java 8 的 Spark 2.4 的新手。我需要帮助。以下是实例示例:

源数据帧

+--------------+
| key | Value |
+--------------+
| A | John |
| B | Nick |
| A | Mary |
| B | Kathy |
| C | Sabrina|
| B | George |
+--------------+

元数据框

+-----+
| key |
+-----+
| A |
| B |
| C |
| D |
| E |
| F |
+-----+

我想将其转换为以下内容:Meta Dataframe 和 Rows 中的列名将根据 Source Dataframe 进行转换

+-----------------------------------------------+
| A | B | C | D | E | F |
+-----------------------------------------------+
| John | Nick | Sabrina | null | null | null |
| Mary | Kathy | null | null | null | null |
| null | George | null | null | null | null |
+-----------------------------------------------+

需要用Java8写一个Spark 2.3的代码。感谢您的帮助。

最佳答案

为了让事情更清楚(并且更容易重现)让我们定义数据帧:

val df1 = Seq("A" -> "John", "B" -> "Nick", "A" -> "Mary", 
"B" -> "Kathy", "C" -> "Sabrina", "B" -> "George")
.toDF("key", "value")
val df2 = Seq("A", "B", "C", "D", "E", "F").toDF("key")

据我所知,您正试图在 df2key 列中按值创建一列。这些列应包含 value 列的所有值,这些值与命名该列的 key 相关联。如果我们举个例子,A 列的第一个值应该是 A 第一次出现的值(如果它存在,否则为 null):"John “。它的第二个值应该是 A 第二次出现的值:"Mary"。没有第三个值,因此该列的第三个值应为 null

我对其进行了详细说明,以表明我们需要每个键(窗口函数)的值的等级概念,并按该等级概念进行分组。它将如下所示:

import org.apache.spark.sql.expressions.Window
val df1_win = df1
.withColumn("id", monotonically_increasing_id)
.withColumn("rank", rank() over Window.partitionBy("key").orderBy("id"))
// the id is just here to maintain the original order.

// getting the keys in df2. Add distinct if there are duplicates.
val keys = df2.collect.map(_.getAs[String](0)).sorted

// then it's just about pivoting
df1_win
.groupBy("rank")
.pivot("key", keys)
.agg(first('value))
.orderBy("rank")
//.drop("rank") // I keep here it for clarity
.show()
+----+----+------+-------+----+----+----+
|rank| A| B| C| D| E| F|
+----+----+------+-------+----+----+----+
| 1|John| Nick|Sabrina|null|null|null|
| 2|Mary| Kathy| null|null|null|null|
| 3|null|George| null|null|null|null|
+----+----+------+-------+----+----+----+

Java 中的代码完全相同

Dataset<Row> df1_win = df1
.withColumn("id", functions.monotonically_increasing_id())
.withColumn("rank", functions.rank().over(Window.partitionBy("key").orderBy("id")));
// the id is just here to maintain the original order.

// getting the keys in df2. Add distinct if there are duplicates.
// Note that it is a list of objects, to match the (strange) signature of pivot
List<Object> keys = df2.collectAsList().stream()
.map(x -> x.getString(0))
.sorted().collect(Collectors.toList());

// then it's just about pivoting
df1_win
.groupBy("rank")
.pivot("key", keys)
.agg(functions.first(functions.col("value")))
.orderBy("rank")
// .drop("rank") // I keep here it for clarity
.show();

关于java - 带有 Java8 的 Spark 2.3 将行转换为列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58108572/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com