gpt4 book ai didi

python - Pandas UDF 函数内部无法识别的函数

转载 作者:行者123 更新时间:2023-12-05 06:17:23 24 4
gpt4 key购买 nike

我在 Pyspark 上使用 Pandas UDF。

我有一个主文件 __main_.py 包含:

from pyspark.sql import SparkSession
from run_udf import compute


def main():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df = compute(df)
df.show()
spark.stop()


if __name__ == "__main__":
main()

还有一个包含我的 UDF 函数和另一个函数(将单个变量乘以 2)的 run_udf.py 文件:

from pyspark.sql.functions import pandas_udf, PandasUDFType


def multi_by_2(x):
return 2 * x


def compute(df):

@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())

df = df.groupby("id").apply(subtract_mean)

return df

运行 main.py 时出现以下错误:“没有名为‘run_udf’的模块”。在这个配置中,subtract_mean() 似乎没有访问函数 multi_by_2()。我找到了 2 种方法,但不知道它是否遵循最佳实践标准:

方法 1:(将函数移到计算中 - 不太理想,因为我每次使用另一个 pandas_udf() 函数时都会复制函数 - 我们失去了“可重用”函数的概念) .

def compute(df):
def multi_by_2(x):
return 2 * x
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())

df = df.groupby("id").apply(subtract_mean)


return df

方法二:将乘法函数作为参数传入计算。

__main_.py

from pyspark.sql import SparkSession
from run_udf import compute
def multi_by_2(x):
return 2 * x

def main():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df = compute(df, multi_by_2)
df.show()
spark.stop()


if __name__ == "__main__":
main()

run_udf.py 从 pyspark.sql.functions 导入 pandas_udf, PandasUDFType

def compute(df, multi_by_2):
@pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
return pdf.assign(v=multi_by_2(v) - v.mean())

df = df.groupby("id").apply(subtract_mean)


return df

我找到的两个解决方案似乎有点老套。有没有更好的方法来解决这个问题?

最佳答案

我知道这个回复会在您发布问题后一段时间出现,但我希望它仍然对您有所帮助!

您想将其包装在嵌套函数中的原因是什么?此外,据我所知,使用 spark 数据帧作为参数调用函数并不常见,因此也许您可以在主脚本中尝试类似以下内容:

from pyspark.sql import SparkSession
from run_udf import substract_mean_udf

def main():
spark = SparkSession.builder.getOrCreate()
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
df =df.groupby("id").apply(subtract_mean_udf)
df.show()
spark.stop()

if __name__ == "__main__":
main()

以及 run_udf.py 脚本的以下内容:

from pyspark.sql.functions import pandas_udf, PandasUDFType

def multi_by_2(x):
return 2 * x

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def subtract_mean_udf(pdf):
# pdf is a pandas.DataFrame
return pdf.assign(v=multi_by_2(pdf.v) - pdf.v.mean())

大部分信息取自关于 Pandas UDF 的 Databricks 笔记本。

你也可以逃脱

return pdf.assign(v=pdf.v*2 - pdf.v.mean())

但我还没有测试过,所以我不能 100% 确定。

关于python - Pandas UDF 函数内部无法识别的函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61705498/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com