gpt4 book ai didi

python - ML 函数作为 pyspark UDF

转载 作者:行者123 更新时间:2023-12-04 12:58:38 25 4
gpt4 key购买 nike

我对 pyspark 和 python 有点陌生。我正在尝试将 ML 函数作为 pyspark UDF 运行。

下面是一个例子:

from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import StringType

df = spark.createDataFrame(['Bob has a dog. He loves him'], StringType())

def parse(text):
import spacy
import neuralcoref
nlp = spacy.load('en_core_web_sm')
# Let's try before using the conversion dictionary:
neuralcoref.add_to_pipe(nlp)
doc = nlp(text)
return doc._.coref_resolved

pd_udf = pandas_udf(parse, returnType=StringType())

df.select(pd_udf(col("value"))).show()

收到此错误:
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 377, in main
process()
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 372, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/serializers.py", line 286, in dump_stream
for series in iterator:
File "<string>", line 1, in <lambda>
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 101, in <lambda>
return lambda *a: (verify_result_length(*a), arrow_return_type)
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py", line 92, in verify_result_length
result = f(*a)
File "/home/user/tools/spark-2.4.3-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/util.py", line 99, in wrapper
return f(*args, **kwargs)
File "<stdin>", line 7, in parse
File "/home/user/anaconda3/lib/python3.7/site-packages/spacy/language.py", line 377, in __call__
doc = self.make_doc(text)
File "/home/user/anaconda3/lib/python3.7/site-packages/spacy/language.py", line 401, in make_doc
return self.tokenizer(text)
TypeError: Argument 'string' has incorrect type (expected str, got Series)


是否可以在 Pyspark 上运行此代码?

最佳答案

嗨,所以我在设置 spacy 和 neuralcoref 时遇到了很多问题,所以我用一个反转字符串的随机函数替换了 nlp 函数。
但基本上这里发生的事情是,当您将 col("value") 传递给 pd_udf 时,它是 pd.Series 类型。因此,在您的 parse 函数中,您应该将其定义为接受该类型,如下所示:

def nlp(text):
return text[::-1]

@pandas_udf("string")
def parse(text: pd.Series) -> pd.Series:
text = text.apply(nlp)
return text

pd_udf = pandas_udf(parse, returnType=StringType())

df = spark.createDataFrame([("Bob has a dog. He loves him",),("dog jumps over the fox",)], ("my_text",))
df.select(parse("my_text")).show()
这给出了以下结果:
+--------------------+
| parse(my_text)|
+--------------------+
|mih sevol eH .god...|
|xof eht revo spmu...|
+--------------------+

关于python - ML 函数作为 pyspark UDF,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/62414109/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com