gpt4 book ai didi

python - UDF 在 PySpark 中运行两次

转载 作者:行者123 更新时间:2023-12-05 06:23:58 24 4
gpt4 key购买 nike

我有一个简单的 spark 数据框,它有两列,都是字符串;一个名为 id,另一个名为 name。我还有一个名为 string_replacement 的 Python 函数,它可以执行一些字符串操作。我定义了一个包装器 UDF,它包含 string_replacement 并应用于数据框的每一行。只有 name 列被传递给字符串操作函数。这是代码

# Import libraries
from pyspark.sql import *
import pyspark.sql.functions as f
from pyspark.sql.types import *

# Create Example Dataframe

row1 = Row(id='123456', name='Computer Science')

df = spark.createDataFrame([row1])

# Print the dataframe
df.show()

# Define function that does some string operations
def string_replacement(input_string):
string=input_string
string=string.replace('Computer', 'Computer x')
string=string.replace('Science', 'Science x')
return string


# Define wrapper function to turn into UFD

def wrapper_func(row):
temp=row[1]
temp=string_replacement(temp)
row[1]=temp

return row


# Create the schema for the resulting data frame
output_schema = StructType([StructField('id', StringType(), True),
StructField('name', StringType(), True)])


# UDF to apply the wrapper function to the dataframe
new_udf=f.udf(lambda z: wrapper_func(z), output_schema)

cols=df.columns
new_df=df.select(new_udf(f.array(cols)).alias('results')).select(f.col('results.*'))

new_df.show(truncate = False)

该函数接受单词 Computer 并将其转换为 Computer x。它对单词 Science 也是如此。

原始数据框是这样的

+------+----------------+
| id| name|
+------+----------------+
|123456|Computer Science|
+------+----------------+

应用函数后的样子

+------+------------------------+
|id |name |
+------+------------------------+
|123456|Computer x x Science x x|
+------+------------------------+

正如您可以从 x x 中看出的那样,它已经运行了该函数两次。第二次在第一次运行的输出上。 如何避免这种行为?

有趣的是,如果我不分解结果数据框,它看起来不错:

new_df=df.select(new_udf(f.array(cols)).alias('results'))

给你

+-----------------------------+
|results |
+-----------------------------+
|[123456,Computer x Science x]|
+-----------------------------+

最佳答案

使用星形展开似乎会导致为每个展开的元素运行一次 UDF,如此处所示。

df.select(new_udf(F.array(cols)).alias('results')).select(F.col('results.*')).explain()

# == Physical Plan ==
# *(1) Project [pythonUDF1#109.id AS id#104, pythonUDF1#109.name AS name#105]
# +- BatchEvalPython [<lambda>(array(id#0, name#1)), <lambda>(array(id#0, name#1))], [id#0, name#1, pythonUDF0#108, pythonUDF1#109]
# +- Scan ExistingRDD[id#0,name#1]

如果你想保持你现在的代码结构,你可以通过将它包装在一个数组中并进行爆炸来解决问题。

df.select(F.explode(F.array(new_udf(F.array(cols)))).alias('results')).select(F.col('results.*')).show(truncate=False)

# +------+--------------------+
# |id |name |
# +------+--------------------+
# |123456|Computer x Science x|
# +------+--------------------+

根据您的用例,如果您可以以这种方式重新实现 UDF,即每行仅处理特定列而不是整行,则代码更具可读性。

def rep_str(string):
res = string
res = res.replace('Computer', 'Computer x')
res = res.replace('Science', 'Science x')
return res

rep_str_udf = F.udf(lambda s: rep_str(s), StringType())

df.withColumn('new_name', rep_str_udf(df.name)).show()

# +------+----------------+--------------------+
# | id| name| new_name|
# +------+----------------+--------------------+
# |123456|Computer Science|Computer x Science x|
# +------+----------------+--------------------+

关于python - UDF 在 PySpark 中运行两次,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/57978579/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com