gpt4 book ai didi

python - 如何在 Spark SQL 中压缩两个数组列

转载 作者:行者123 更新时间:2023-12-03 15:11:54 26 4
gpt4 key购买 nike

我有一个 Pandas 数据框。我试图首先将包含字符串值的两列连接到一个列表中,然后使用 zip,我用“_”连接了列表的每个元素。我的数据集如下:

df['column_1']: 'abc, def, ghi'
df['column_2']: '1.0, 2.0, 3.0'

我想将这两列加入第三列中,如下所示,用于我的数据框的每一行。

df['column_3']: [abc_1.0, def_2.0, ghi_3.0]

我已经使用下面的代码在 python 中成功地做到了这一点,但数据帧非常大,需要很长时间才能为整个数据帧运行它。我想在 PySpark 中做同样的事情以提高效率。我已经成功读取了 spark 数据帧中的数据,但是我很难确定如何使用 PySpark 等效函数复制 Pandas 函数。如何在 PySpark 中获得我想要的结果?
df['column_3'] = df['column_2']
for index, row in df.iterrows():
while index < 3:
if isinstance(row['column_1'], str):
row['column_1'] = list(row['column_1'].split(','))
row['column_2'] = list(row['column_2'].split(','))
row['column_3'] = ['_'.join(map(str, i)) for i in zip(list(row['column_1']), list(row['column_2']))]

我已使用以下代码将两列转换为 PySpark 中的数组
from pyspark.sql.types import ArrayType, IntegerType, StringType
from pyspark.sql.functions import col, split

crash.withColumn("column_1",
split(col("column_1"), ",\s*").cast(ArrayType(StringType())).alias("column_1")
)
crash.withColumn("column_2",
split(col("column_2"), ",\s*").cast(ArrayType(StringType())).alias("column_2")
)

现在我需要的是使用“_”压缩两列中数组的每个元素。我该如何使用 zip 呢?任何帮助表示赞赏。

最佳答案

与 Python 等效的 Spark SQL 将是 pyspark.sql.functions.arrays_zip :

pyspark.sql.functions.arrays_zip(*cols)

Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.


因此,如果您已经有两个数组:
from pyspark.sql.functions import split

df = (spark
.createDataFrame([('abc, def, ghi', '1.0, 2.0, 3.0')])
.toDF("column_1", "column_2")
.withColumn("column_1", split("column_1", "\s*,\s*"))
.withColumn("column_2", split("column_2", "\s*,\s*")))
您可以将其应用于结果
from pyspark.sql.functions import arrays_zip

df_zipped = df.withColumn(
"zipped", arrays_zip("column_1", "column_2")
)

df_zipped.select("zipped").show(truncate=False)
+------------------------------------+
|zipped |
+------------------------------------+
|[[abc, 1.0], [def, 2.0], [ghi, 3.0]]|
+------------------------------------+
现在将结果结合起来,您可以 transform ( How to use transform higher-order function? , TypeError: Column is not iterable - How to iterate over ArrayType()? ):
df_zipped_concat = df_zipped.withColumn(
"zipped_concat",
expr("transform(zipped, x -> concat_ws('_', x.column_1, x.column_2))")
)

df_zipped_concat.select("zipped_concat").show(truncate=False)
+---------------------------+
|zipped_concat |
+---------------------------+
|[abc_1.0, def_2.0, ghi_3.0]|
+---------------------------+
备注 :
高阶函数 transformarrays_zip已在 Apache Spark 2.4 中引入。

关于python - 如何在 Spark SQL 中压缩两个数组列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/54282706/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com