gpt4 book ai didi

python - Pyspark UDF 中自定义 Python 对象的使用

转载 作者:行者123 更新时间:2023-11-30 22:28:13 28 4
gpt4 key购买 nike

运行以下 PySpark 代码时:

nlp = NLPFunctions()

def parse_ingredients(ingredient_lines):
parsed_ingredients = nlp.getingredients_bulk(ingredient_lines)[0]
return list(chain.from_iterable(parsed_ingredients))


udf_parse_ingredients = UserDefinedFunction(parse_ingredients, ArrayType(StringType()))

我收到以下错误:_pickle.PicklingError:无法序列化对象:TypeError:无法pickle _thread.lock对象

我想这是因为 PySpark 无法序列化这个自定义类。但是,如何避免在每次运行 parse_ingredients_line 函数时实例化这个昂贵的对象的开销呢?

最佳答案

假设您想使用像这样定义的 Identity 类 (identity.py):

class Identity(object):                   
def __getstate__(self):
raise NotImplementedError("Not serializable")

def identity(self, x):
return x

例如,您可以使用可调用对象 (f.py) 并将 Identity 实例存储为类成员:

from identity import Identity

class F(object):
identity = None

def __call__(self, x):
if not F.identity:
F.identity = Identity()
return F.identity.identity(x)

并按如下所示使用它们:

from pyspark.sql.functions import udf
import f

sc.addPyFile("identity.py")
sc.addPyFile("f.py")

f_ = udf(f.F())

spark.range(3).select(f_("id")).show()
+-----+
|F(id)|
+-----+
| 0|
| 1|
| 2|
+-----+

或独立函数和闭包:

from pyspark.sql.functions import udf
import identity

sc.addPyFile("identity.py")

def f():
dict_ = {}
@udf()
def f_(x):
if "identity" not in dict_:
dict_["identity"] = identity.Identity()
return dict_["identity"].identity(x)
return f_


spark.range(3).select(f()("id")).show()
+------+
|f_(id)|
+------+
| 0|
| 1|
| 2|
+------+

关于python - Pyspark UDF 中自定义 Python 对象的使用,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46692370/

28 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com