gpt4 book ai didi

pyspark - 使用原始数据框加入/展开 mapType 列

转载 作者:行者123 更新时间:2023-12-02 01:06:40 25 4
gpt4 key购买 nike

我在 (py)Spark 中有一个数据框,其中 1 列来自“ map ”类型。我想将那一列展平或分成多列,这些列应该添加到原始数据框中。我可以使用 flatMap 展开列,但是我松开了将新数据框(从展开的列)与原始数据框连接起来的键。

我的架构是这样的:

    rroot
|-- key: string (nullable = true)
|-- metric: map (nullable = false)
| |-- key: string
| |-- value: float (valueContainsNull = true)

如您所见,“指标”列是一个映射字段。这是我要展平的专栏。在展平之前它看起来像:

+----+---------------------------------------------------+
|key |metric |
+----+---------------------------------------------------+
|123k|Map(metric1 -> 1.3, metric2 -> 6.3, metric3 -> 7.6)|
|d23d|Map(metric1 -> 1.5, metric2 -> 2.0, metric3 -> 2.2)|
|as3d|Map(metric1 -> 2.2, metric2 -> 4.3, metric3 -> 9.0)|
+----+---------------------------------------------------+

将该字段转换为我做的列

df2.select('metric').rdd.flatMap(lambda x: x).toDF().show()

给出

   +------------------+-----------------+-----------------+
| metric1| metric2| metric3|
+------------------+-----------------+-----------------+
|1.2999999523162842|6.300000190734863|7.599999904632568|
| 1.5| 2.0|2.200000047683716|
| 2.200000047683716|4.300000190734863| 9.0|
+------------------+-----------------+-----------------+

但是我没有看到 key ,因此我不知道如何将此数据添加到原始数据框中。

我想要的是:

+----+-------+-------+-------+
| key|metric1|metric2|metric3|
+----+-------+-------+-------+
|123k| 1.3| 6.3| 7.6|
|d23d| 1.5| 2.0| 2.2|
|as3d| 2.2| 4.3| 9.0|
+----+-------+-------+-------+

因此我的问题是:如何将 df2 恢复为 df(假设我最初不知道 df,只有 df2)

制作 df2:

rdd = sc.parallelize([('123k', 1.3, 6.3, 7.6),
('d23d', 1.5, 2.0, 2.2),
('as3d', 2.2, 4.3, 9.0)
])
schema = StructType([StructField('key', StringType(), True),
StructField('metric1', FloatType(), True),
StructField('metric2', FloatType(), True),
StructField('metric3', FloatType(), True)])
df = sqlContext.createDataFrame(rdd, schema)


from pyspark.sql.functions import lit, col, create_map
from itertools import chain

metric = create_map(list(chain(*(
(lit(name), col(name)) for name in df.columns if "metric" in name
)))).alias("metric")


df2 = df.select("key", metric)

最佳答案

from pyspark.sql.functions import explode

# fetch column names of the original dataframe from keys of MapType 'metric' column
col_names = df2.select(explode("metric")).select("key").distinct().sort("key").rdd.flatMap(lambda x: x).collect()

exprs = [col("key")] + [col("metric").getItem(k).alias(k) for k in col_names]
df2_to_original_df = df2.select(*exprs)
df2_to_original_df.show()

输出是:

+----+-------+-------+-------+
| key|metric1|metric2|metric3|
+----+-------+-------+-------+
|123k| 1.3| 6.3| 7.6|
|d23d| 1.5| 2.0| 2.2|
|as3d| 2.2| 4.3| 9.0|
+----+-------+-------+-------+

关于pyspark - 使用原始数据框加入/展开 mapType 列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/47228223/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com