gpt4 book ai didi

python - 从 Pyspark UDF 调用另一个自定义 Python 函数

转载 作者:太空宇宙 更新时间:2023-11-04 04:14:29 24 4
gpt4 key购买 nike

假设您有一个文件,我们将其命名为 udfs.py 并在其中:

def nested_f(x):
return x + 1

def main_f(x):
return nested_f(x) + 1

然后您想要从 main_f 函数创建一个 UDF 并在数据帧上运行它:

import pyspark.sql.functions as fn
import pandas as pd

pdf = pd.DataFrame([[1], [2], [3]], columns=['x'])
df = spark.createDataFrame(pdf)

_udf = fn.udf(main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

如果我们在定义这两个函数的同一文件 (udfs.py) 中执行此操作,则可以正常工作。但是,尝试从不同的文件(例如main.py)执行此操作会产生错误ModuleNotFoundError: No module named ...:

...
import udfs

_udf = fn.udf(udfs.main_f, 'int')
df.withColumn('x1', _udf(df['x'])).show()

我注意到,如果我实际上嵌套 nested_fmain_f 中,如下所示:

def main_f(x):
def nested_f(x):
return x + 1

return nested_f(x) + 1

一切正常。但是,我的目标是将逻辑很好地分离到多个函数中,我也可以单独测试它们。

认为这可以通过使用 spark.sparkContext.addPyFile( '...udfs.py')。然而:

  1. 我觉得这有点啰嗦(尤其是如果您需要压缩文件夹等...)
  2. 这并不总是那么容易/可行(例如 udfs.py 可能正在使用许多其他模块,然后也需要提交这些模块,从而导致一些链式 react ...)
  3. addPyFile 还有其他一些不便之处(例如 autoreload can stop working 等)

所以问题是:有没有办法同时做所有这些:

  • 将 UDF 的逻辑很好地拆分为几个 Python 函数
  • 使用不同于定义逻辑的文件中的 UDF
  • 不需要使用 addPyFile 提交任何依赖项

澄清这是如何工作的/为什么这不起作用的奖励积分!

最佳答案

对于较小的(一个或两个本地文件)依赖项,您可以使用 --py-files 并枚举它们,对于更大或更多的依赖项 - 最好将其打包在 zip 或 egg 文件中。

文件udfs.py:

def my_function(*args, **kwargs):
# code

文件 main.py:

from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf
from udfs import my_function

sc = SparkContext()
spark = SparkSession(sc)
my_udf = udf(my_function)

df = spark.createDataFrame([(1, "a"), (2, "b")])
df.withColumn("my_f", my_udf("..."))

对于运行:

pyspark --py-files /path/to/udfs.py
# or
spark-submit --py-files /path/to/udfs.py main.py

如果你已经编写了自己的 Python 模块甚至第三方模块(不需要 C 编译),我个人需要用 geoip2,最好创建一个 zip 或 egg 文件.

# pip with -t install all modules and dependencies in directory `src`
pip install geoip2 -t ./src
# Or from local directory
pip install ./my_module -t ./src

# Best is
pip install -r requirements.txt -t ./src

# If you need add some additionals files
cp ./some_scripts/* ./src/

# And pack it
cd ./src
zip -r ../libs.zip .
cd ..

pyspark --py-files libs.zip
spark-submit --py-files libs.zip

在使用 --py-files< 的 pyspark shell 中使用 pyspark --master yarn(可能与其他非本地 master 选项)时要小心:

>>> import sys
>>> sys.path.insert(0, '/path/to/libs.zip') # You can use relative path: .insert(0, 'libs.zip')
>>> import MyModule # libs.zip/MyModule

编辑 - 关于如何在没有 addPyFile ()--py-files 的情况下在执行器上获取函数的问题的答案:

有必要让一个给定的文件在各个执行器上具有功能。并且可以通过 PATH env 访问。因此,我可能会编写一个 Python 模块,然后将其安装在执行程序上并在环境中可用。

关于python - 从 Pyspark UDF 调用另一个自定义 Python 函数,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/55688664/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com