- android - 多次调用 OnPrimaryClipChangedListener
- android - 无法更新 RecyclerView 中的 TextView 字段
- android.database.CursorIndexOutOfBoundsException : Index 0 requested, 光标大小为 0
- android - 使用 AppCompat 时,我们是否需要明确指定其 UI 组件(Spinner、EditText)颜色
尝试通过 Spark-Submit 或 Zeppelin 运行某些代码时出现以下错误:“_pickle.PicklingError: __ newobj __ args 中的 args[0] 具有错误的类”
我浏览过有同样问题的帖子,但对这个问题没有太多了解。
回溯(包含在下面)指向我使用的 udfs 之一:
udf_stop_words = udf(stop_words, ArrayType(StringType()))
def stop_words(words):
return list(word.lower() for word in words if word.lower() not in stopwords.words("english"))
函数的输入和输出都是字符串列表。这些是来自输入的 3 行:
[Row(split_tokenized_activity_description=['A', 'delightful', '45', 'minute', 'Swedish', 'style', 'massage']), Row(split_tokenized_activity_description=['A', 'more', 'intense', '45', 'minute', 'version', 'of', 'a', 'Swedish', 'style', 'massage']), Row(split_tokenized_activity_description=['A', 'relaxing', '45', 'minute', 'Swedish', 'style', 'massage'])
这是我正在使用的代码片段。
def special_car(x):
# remove the special character and replace them with the stop word " " (space)
return [re.sub('[^A-Za-z0-9]+', ' ', x)]
# Create UDF from function
udf_special_car = udf(special_car, ArrayType(StringType()))
# Function to remove stops words
def stop_words(words):
return list(word.lower() for word in words if word.lower() not in stopwords.words("english"))
udf_stop_words = udf(stop_words, ArrayType(StringType()))
# Load in data
df_tags = spark.sql("select * from database")
# Remove special Characters
df1_tags = df_tags.withColumn('tokenized_name', udf_special_car(df_tags.name))
df2_tags = df1_tags.withColumn('tokenized_description', udf_special_car(df1_tags.description))
# Select only relevent columns
df3_tags = df2_tags.select(['tag_id', 'tokenized_name', 'tokenized_description'])
# Tokenize tag_name and tag_desc (Seperate on spaces) (This uses the pyspark.sql.split function)
df4_tags = df3_tags.withColumn('split_tokenized_name', split(df3_tags['tokenized_name'].getItem(0), ' '))
df5_tags = df4_tags.withColumn('split_tokenized_description', split(df3_tags['tokenized_description'].getItem(0), ' '))
# Select only relevent columns
df6_tags = df5_tags.select(['tag_id', 'split_tokenized_name', 'split_tokenized_description'])
# Remove Stop words
df7_tags = df6_tags.withColumn('stop_words_tokenized_name', udf_stop_words(df6_tags.split_tokenized_name))
df8_tags = df7_tags.withColumn('stop_words_tokenized_description', udf_stop_words(df7_tags.split_tokenized_description))
奇怪的是,前两次通过 Zeppelin 运行我的代码时出现错误,但在第三次尝试后,它运行得很好,并且输出是我期望的结果。不过,Zeppelin 仅用于测试;我需要让它通过 Spark-Submit 运行。
Traceback (most recent call last): File "/tmp/testing_test.py", line 262, in udf_stop_words = udf(stop_words, ArrayType(StringType())) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1872, in udf File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1830, in init File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1835, in _create_judf File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/sql/functions.py", line 1815, in _wrap_function File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/rdd.py", line 2359, in _prepare_for_python_RDD File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 460, in dumps File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 703, in dumps File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 147, in dump File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 409, in dump self.save(obj) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 736, in save_tuple save(element) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 248, in save_function File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 296, in save_function_tuple File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 476, in save f(self, obj) # Call unbound method with explicit self File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 821, in save_dict self._batch_setitems(obj.items()) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 852, in _batch_setitems save(v) File "/home/hadoop/anaconda/lib/python3.6/pickle.py", line 521, in save self.save_reduce(obj=obj, *rv) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/cloudpickle.py", line 564, in save_reduce _pickle.PicklingError: args[0] from newobj args has the wrong class
我尝试了多种方法来解决此问题,但都没有奏效。它们都返回相同的错误。
我尝试将 udf 更改为单行 lambda 函数:
udf(lambda words: list(word.lower() for word in words if word.lower() not in stopwords.words('english')), ArrayType(StringType())).
我尝试更改 udf 以返回字符串:
udf_stop_words = udf(stop_words, StringType())
并稍微更改 udf 以匹配。
def stop_words(words):
return str(word.lower() for word in words if word.lower() not in stopwords.words('english'))
我尝试将其定义为具有以下两者的 StructType:
udf_stop_words = udf(stop_words, StructType([StructField("words", ArrayType(StringType()), False)]))
和
udf_stop_words = udf(stop_words, StructType([StructField("words", StringType(), False)])).
我还尝试了上述的多种组合。
最佳答案
返回类型应该是ArrayType(StringType())
。
我对此不确定,但问题可能是由于您的节点(或corpus
)上没有安装nltk
stopwords
从未下载到节点上)。由于在 UDF
内调用 stopwords.words("english")
就像在节点上调用它一样,因此可能会失败,因为它找不到语料库。
由于 stopwords.words("english")
只是一个列表,因此您应该在驱动程序上调用它,然后将其广播到节点:
from nltk.corpus import stopwords
english_stopwords = stopwords.words("english")
sc.broadcast(english_stopwords)
def stop_words(words):
return list(word.lower() for word in words if word.lower() not in english_stopwords)
from pyspark.sql.types import ArrayType, StringType
import pyspark.sql.functions as psf
udf_stop_words = psf.udf(stop_words, ArrayType(StringType()))
关于python - Spark-Submit 的 pickle 错误 "_pickle.PicklingError: args[0] from __newobj__ args has the wrong class",我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46878186/
我有这个代码。为了让它工作,我必须使用 Args&&... 而不是 Args... 或 Args&... 我注意到 args 从 & 转换为 const& 或 && 转换为 &。 Args...Arg
当我定义类时,我总是去 Class A(object): def __init__(self, arg): self.arg = arg def print_arg(s
假设我想定义两个 {Type} 类的变量。构造函数采用 1 个参数。下面两种方式是否完全等价(编译成相同的目标代码)? Type a(arg), b(arg); 和 Type a(arg); Type
(旁白:我是一名 Perl 程序员,正如您所知,这是我的第一个重要的 Java 程序。简单的术语将不胜感激。) 我有以下启动器作为编码工作: import java.lang.reflect.*; i
Math.nextUp(arg) 始终与 arg + Math.ulp(arg) 相同,还是我遗漏了什么? System.out.println( 0.5 + Math.ulp(0.5));
今天我在学习完美转发,我创建了这个代码示例 #include #include template auto toStdFun(Function&& fun, Args&&...ar
我想知道你会选择哪个选项? putStrLn (show randomNum) putStrLn $ show randomNum (putStrLn . show) randomNum 所有选项在语
我试图在 visual studio 2012 中编译一个库,它最初是用 c++ 为 visual studio 2015 编写的。我有一个错误说 'class' missing tag。 错误消息的
我在下面的代码中遇到了运行时异常ArrayIndexOutOfBoundException,行中: if ( args[0].equals("t") || args[0].equals("time")
我有以下代码 import React, { Component } from "react"; import { Accounts } from "meteor/accounts-base"; ex
这个问题已经有答案了: Difference between Arrays and 3 dots (Varargs) in java (3 个回答) 已关闭 5 年前。 受学校线性代数 I 和 II
所以我定义了一个函数: def getDistnace(self, strings, parentD, nodeName, nodeDistance): 我用它来调用: Node.getDistnac
这个问题在这里已经有了答案: subprocess.call() arguments ignored when using shell=True w/ list [duplicate] (2 个答案
我想将参数传递给 java 应用程序,但喜欢 linux 应用程序风格。 java 中的main 方法对所有参数使用一个String 数组。在 Linux 中,大多数应用程序接受如下参数:ls -l
这是我的代码片段 #include void change(int a[]){ printf("%p\n",&a); } int main(){
我需要使用 python 3.6 subprocess.run() 函数发出以下命令: gsettings set org.gnome.shell enabled-extensions "['appl
这两个函数是否有任何有意义的不同?有什么理由通常更喜欢一个而不是另一个吗? void foo(auto x, auto &... y) { /* ... */ } template void foo(
例如: def m(arg, ...args) { println "arg: $arg" println "args: $args" } m('arg', k:'v') 输出: ar
我对 Java 还很陌生。目前正在尝试将 args[] 中给出的文件名传递给此 FileReader,但当我编译时,它说找不到指定的文件。如果我对文件名进行硬编码,它就可以正常工作。这应该如何运作?
为什么这是一个语法错误??做这件事的合适方法是什么? >>> def f(*args, option=None): File "", line 1 def f(*args, option=
我是一名优秀的程序员,十分优秀!