gpt4 book ai didi

python - Spark-Submit 的 pickle 错误 "_pickle.PicklingError: args[0] from __newobj__ args has the wrong class"

转载 作者:太空宇宙 更新时间:2023-11-03 14:36:46 25 4
gpt4 key购买 nike

尝试通过 Spark-Submit 或 Zeppelin 运行某些代码时出现以下错误:“_pickle.PicklingError: __ newobj __ args 中的 args[0] 具有错误的类”

我浏览过有同样问题的帖子,但对这个问题没有太多了解。

回溯(包含在下面)指向我使用的 udfs 之一:

udf_stop_words = udf(stop_words, ArrayType(StringType()))

def stop_words(words):
return list(word.lower() for word in words if word.lower() not in stopwords.words("english"))

函数的输入和输出都是字符串列表。这些是来自输入的 3 行:

[Row(split_tokenized_activity_description=['A', 'delightful', '45', 'minute', 'Swedish', 'style', 'massage']), Row(split_tokenized_activity_description=['A', 'more', 'intense', '45', 'minute', 'version', 'of', 'a', 'Swedish', 'style', 'massage']), Row(split_tokenized_activity_description=['A', 'relaxing', '45', 'minute', 'Swedish', 'style', 'massage'])

这是我正在使用的代码片段。

def special_car(x):
# remove the special character and replace them with the stop word " " (space)
return [re.sub('[^A-Za-z0-9]+', ' ', x)]

# Create UDF from function
udf_special_car = udf(special_car, ArrayType(StringType()))

# Function to remove stops words
def stop_words(words):
return list(word.lower() for word in words if word.lower() not in stopwords.words("english"))

udf_stop_words = udf(stop_words, ArrayType(StringType()))

# Load in data
df_tags = spark.sql("select * from database")

# Remove special Characters
df1_tags = df_tags.withColumn('tokenized_name', udf_special_car(df_tags.name))
df2_tags = df1_tags.withColumn('tokenized_description', udf_special_car(df1_tags.description))

# Select only relevent columns
df3_tags = df2_tags.select(['tag_id', 'tokenized_name', 'tokenized_description'])

# Tokenize tag_name and tag_desc (Seperate on spaces) (This uses the pyspark.sql.split function)
df4_tags = df3_tags.withColumn('split_tokenized_name', split(df3_tags['tokenized_name'].getItem(0), ' '))
df5_tags = df4_tags.withColumn('split_tokenized_description', split(df3_tags['tokenized_description'].getItem(0), ' '))

# Select only relevent columns
df6_tags = df5_tags.select(['tag_id', 'split_tokenized_name', 'split_tokenized_description'])

# Remove Stop words
df7_tags = df6_tags.withColumn('stop_words_tokenized_name', udf_stop_words(df6_tags.split_tokenized_name))
df8_tags = df7_tags.withColumn('stop_words_tokenized_description', udf_stop_words(df7_tags.split_tokenized_description))

奇怪的是,前两次通过 Zeppelin 运行我的代码时出现错误,但在第三次尝试后,它运行得很好,并且输出是我期望的结果。不过,Zeppelin 仅用于测试;我需要让它通过 Spark-Submit 运行。

Traceback (most recent call last): File "/tmp/testing_test.py", line 262, in udf_stop_words = udf(stop_words, ArrayType(StringType())) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1872, in udf File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1830, in init File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1835, in _create_judf File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1815, in _wrap_function File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2359, in _prepare_for_python_RDD File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 460, in dumps File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 703, in dumps File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 147, in dump File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 409, in dump self.save(obj) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 736, in save_tuple save(element) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 248, in save_function File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 296, in save_function_tuple File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 852, in _batch_setitems save(v) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 564, in save_reduce _pickle.PicklingError: args[0] from newobj args has the wrong class

我尝试了多种方法来解决此问题,但都没有奏效。它们都返回相同的错误。

我尝试将 udf 更改为单行 lambda 函数:

udf(lambda words: list(word.lower() for word in words if word.lower() not in stopwords.words('english')), ArrayType(StringType())).

我尝试更改 udf 以返回字符串:

udf_stop_words = udf(stop_words, StringType())

并稍微更改 udf 以匹配。

def stop_words(words):
return str(word.lower() for word in words if word.lower() not in stopwords.words('english'))

我尝试将其定义为具有以下两者的 StructType:

udf_stop_words = udf(stop_words, StructType([StructField("words", ArrayType(StringType()), False)])) 

udf_stop_words = udf(stop_words, StructType([StructField("words", StringType(), False)])).

我还尝试了上述的多种组合。

最佳答案

返回类型应该是ArrayType(StringType())

我对此不确定,但问题可能是由于您的节点(或corpus)上没有安装nltk stopwords 从未下载到节点上)。由于在 UDF 内调用 stopwords.words("english") 就像在节点上调用它一样,因此可能会失败,因为它找不到语料库。

由于 stopwords.words("english") 只是一个列表,因此您应该在驱动程序上调用它,然后将其广播到节点:

from nltk.corpus import stopwords
english_stopwords = stopwords.words("english")
sc.broadcast(english_stopwords)
def stop_words(words):
return list(word.lower() for word in words if word.lower() not in english_stopwords)

from pyspark.sql.types import ArrayType, StringType
import pyspark.sql.functions as psf
udf_stop_words = psf.udf(stop_words, ArrayType(StringType()))

关于python - Spark-Submit 的 pickle 错误 "_pickle.PicklingError: args[0] from __newobj__ args has the wrong class",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46878186/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com