gpt4 book ai didi

python - 在 PySpark 中将 Spark DataFrame 从行转换为列,并将其附加到另一个 DataFrame

转载 作者:行者123 更新时间:2023-12-04 04:23:26 27 4
gpt4 key购买 nike

我在 PySpark 中有一个 Spark DataFrame avg_length_df看起来像 -

+----------------+---------+----------+-----------+---------+-------------+----------+
| id | x| a| b| c| country| param|
+----------------+---------+----------+-----------+---------+-------------+----------+
| 40.0| 9.0| 5.284| 5.047| 6.405| 13.0|avg_length|
+----------------+---------+----------+-----------+---------+-------------+----------+

我想将它从行到列转置,以便它变成 -
+----------+
|avg_length|
+----------+
| 40.0|
| 9.0|
| 5.284|
| 5.047|
| 6.405|
| 13.0|
+----------+

接下来,我有第二个 DataFrame df2 :
+----------------+------+
| col_names|dtypes|
+----------------+------+
| id|string|
| x| int|
| a|string|
| b|string|
| c|string|
| country|string|
+----------------+------+

我想创建一个专栏 avg_lengthdf2等于上面转置的DataFrame。所以预期的输出看起来像:
+----------------+------+----------+
| col_names|dtypes|avg_length|
+----------------+------+----------+
| id|string| 40.0|
| x| int| 9.0|
| a|string| 5.284|
| b|string| 5.047|
| c|string| 6.405|
| country|string| 13.0|
+----------------+------+----------+

我如何完成这 2 个操作?

最佳答案

>>> from pyspark.sql import *
#Input DataFrame
>>> df.show()
+----+---+-----+-----+-----+-------+----------+
| id| x| a| b| c|country| param|
+----+---+-----+-----+-----+-------+----------+
|40.0|9.0|5.284|5.047|6.405| 13.0|avg_length|
+----+---+-----+-----+-----+-------+----------+

>>> avgDF = df.groupBy(df["id"],df["x"],df["a"],df["b"],df["c"],df["country"]).pivot("param").agg(concat_ws("",collect_list(to_json(struct("id","x","a","b","c","country"))))).drop("id","x","a","b","c","country")
>>> avgDF.show(2,False)
+----------------------------------------------------------------------------+
|avg_length |
+----------------------------------------------------------------------------+
|{"id":"40.0","x":"9.0","a":"5.284","b":"5.047","c":"6.405","country":"13.0"}|
+----------------------------------------------------------------------------+

>>> finalDF = avgDF.withColumn("value", explode(split(regexp_replace(col("avg_length"),"""[\\{ " \\}]""",""),","))).withColumn("avg_length", split(col("value"), ":")[1]).withColumn("col_names", split(col("value"), ":")[0]).drop("value")
>>> finalDF.show(10,False)
+----------+---------+
|avg_length|col_names|
+----------+---------+
|40.0 |id |
|9.0 |x |
|5.284 |a |
|5.047 |b |
|6.405 |c |
|13.0 |country |
+----------+---------+

#other dataframe
>>> df2.show()
+---------+------+
|col_names|dtypes|
+---------+------+
| id|string|
| x| int|
| a|string|
| b|string|
| c|string|
| country|string|
+---------+------+

>>> df2.join(finalDF,"col_names").show(10,False)
+---------+------+----------+
|col_names|dtypes|avg_length|
+---------+------+----------+
|id |string|40.0 |
|x |int |9.0 |
|a |string|5.284 |
|b |string|5.047 |
|c |string|6.405 |
|country |string|13.0 |
+---------+------+----------+

关于python - 在 PySpark 中将 Spark DataFrame 从行转换为列,并将其附加到另一个 DataFrame,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/58529216/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com