gpt4 book ai didi

python - Pyspark:将多个数组列拆分为行

转载 作者:IT老高 更新时间:2023-10-28 20:36:44 25 4
gpt4 key购买 nike

我有一个数据框,它有一行和几列。一些列是单个值,而其他列是列表。所有列表列的长度相同。我想将每个列表列拆分为单独的行,同时保持所有非列表列不变。

样本 DF:

from pyspark import Row
from pyspark.sql import SQLContext
from pyspark.sql.functions import explode

sqlc = SQLContext(sc)

df = sqlc.createDataFrame([Row(a=1, b=[1,2,3],c=[7,8,9], d='foo')])
# +---+---------+---------+---+
# | a| b| c| d|
# +---+---------+---------+---+
# | 1|[1, 2, 3]|[7, 8, 9]|foo|
# +---+---------+---------+---+

我想要什么:

+---+---+----+------+
| a| b| c | d |
+---+---+----+------+
| 1| 1| 7 | foo |
| 1| 2| 8 | foo |
| 1| 3| 9 | foo |
+---+---+----+------+

如果我只有一个列表列,只需执行 explode 即可轻松完成:

df_exploded = df.withColumn('b', explode('b'))
# >>> df_exploded.show()
# +---+---+---------+---+
# | a| b| c| d|
# +---+---+---------+---+
# | 1| 1|[7, 8, 9]|foo|
# | 1| 2|[7, 8, 9]|foo|
# | 1| 3|[7, 8, 9]|foo|
# +---+---+---------+---+

但是,如果我尝试也 explode c 列,我最终会得到一个长度为我想要的平方的数据框:

df_exploded_again = df_exploded.withColumn('c', explode('c'))
# >>> df_exploded_again.show()
# +---+---+---+---+
# | a| b| c| d|
# +---+---+---+---+
# | 1| 1| 7|foo|
# | 1| 1| 8|foo|
# | 1| 1| 9|foo|
# | 1| 2| 7|foo|
# | 1| 2| 8|foo|
# | 1| 2| 9|foo|
# | 1| 3| 7|foo|
# | 1| 3| 8|foo|
# | 1| 3| 9|foo|
# +---+---+---+---+

我想要的是 - 对于每一列,获取该列中数组的第 n 个元素并将其添加到新行中。我已经尝试在数据框中的所有列上映射一个爆炸,但这似乎也不起作用:

df_split = df.rdd.map(lambda col: df.withColumn(col, explode(col))).toDF()

最佳答案

Spark >= 2.4

您可以将 zip_ udf 替换为 arrays_zip 函数

from pyspark.sql.functions import arrays_zip, col, explode

(df
.withColumn("tmp", arrays_zip("b", "c"))
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.b"), col("tmp.c"), "d"))

Spark <2.4

使用 DataFrames 和 UDF:

from pyspark.sql.types import ArrayType, StructType, StructField, IntegerType
from pyspark.sql.functions import col, udf, explode

zip_ = udf(
lambda x, y: list(zip(x, y)),
ArrayType(StructType([
# Adjust types to reflect data types
StructField("first", IntegerType()),
StructField("second", IntegerType())
]))
)

(df
.withColumn("tmp", zip_("b", "c"))
# UDF output cannot be directly passed to explode
.withColumn("tmp", explode("tmp"))
.select("a", col("tmp.first").alias("b"), col("tmp.second").alias("c"), "d"))

使用RDD:

(df
.rdd
.flatMap(lambda row: [(row.a, b, c, row.d) for b, c in zip(row.b, row.c)])
.toDF(["a", "b", "c", "d"]))

由于 Python 通信开销,这两种解决方案都效率低下。如果数据大小是固定的,你可以这样做:

from functools import reduce
from pyspark.sql import DataFrame

# Length of array
n = 3

# For legacy Python you'll need a separate function
# in place of method accessor
reduce(
DataFrame.unionAll,
(df.select("a", col("b").getItem(i), col("c").getItem(i), "d")
for i in range(n))
).toDF("a", "b", "c", "d")

甚至:

from pyspark.sql.functions import array, struct

# SQL level zip of arrays of known size
# followed by explode
tmp = explode(array(*[
struct(col("b").getItem(i).alias("b"), col("c").getItem(i).alias("c"))
for i in range(n)
]))

(df
.withColumn("tmp", tmp)
.select("a", col("tmp").getItem("b"), col("tmp").getItem("c"), "d"))

与 UDF 或 RDD 相比,这应该明显更快。泛化为支持任意数量的列:

# This uses keyword only arguments
# If you use legacy Python you'll have to change signature
# Body of the function can stay the same
def zip_and_explode(*colnames, n):
return explode(array(*[
struct(*[col(c).getItem(i).alias(c) for c in colnames])
for i in range(n)
]))

df.withColumn("tmp", zip_and_explode("b", "c", n=3))

关于python - Pyspark:将多个数组列拆分为行,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/41027315/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com