- android - RelativeLayout 背景可绘制重叠内容
- android - 如何链接 cpufeatures lib 以获取 native android 库?
- java - OnItemClickListener 不起作用,但 OnLongItemClickListener 在自定义 ListView 中起作用
- java - Android 文件转字符串
我正在尝试使用分区任务中的 spark jdbc() 函数写入 MySQL 表,该分区任务是通过执行 foreachPartitions(test) 调用的。然而,我收到了一个选择错误。
我不确定问题是否是由于 spark 已经在任务内部并且 spark 将 write.jdbc() 作为任务本身运行。根据我的理解,这是不允许的?我可以从我的 test() 函数返回列表“行”并在 main 中调用 write.jdbc() 但我宁愿不必将数据结构收集回主控。代码和错误:
代码:
def test(partition_iter):
row = []
row.append({'col1': 26, 'col2': 12, 'col2': 153.49353894392, 'col4': 1})
df_row = SPARK.createDataFrame(row)
df_row.write.jdbc(url="jdbc:mysql://rds-url/db_name", table="db_name", properties={"driver":"com.mysql.jdbc.Driver","user":"user", "password":"password"}, mode="append")
def main():
SPARK.sparkcontext.parallelize([1, 2, 3, 4]).foreachPartition(test)
main()
错误:
Traceback (most recent call last):
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 107, in dump
return Pickler.dump(self, obj)
File "/usr/lib64/python2.7/pickle.py", line 224, in dump
self.save(obj)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 568, in save_tuple
save(element)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 214, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 251, in save_function_tuple
save((code, closure, base_globals))
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 554, in save_tuple
save(element)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 606, in save_list
self._batch_appends(iter(obj))
File "/usr/lib64/python2.7/pickle.py", line 642, in _batch_appends
save(tmp[0])
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 208, in save_function
self.save_function_tuple(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 256, in save_function_tuple
save(f_globals)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 692, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 563, in save_reduce
save(state)
File "/usr/lib64/python2.7/pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "/usr/lib64/python2.7/pickle.py", line 655, in save_dict
self._batch_setitems(obj.iteritems())
File "/usr/lib64/python2.7/pickle.py", line 687, in _batch_setitems
save(v)
File "/usr/lib64/python2.7/pickle.py", line 306, in save
rv = reduce(self.proto)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/lib/spark/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py", line 323, in get_return_value
format(target_id, ".", name, value))
Py4JError: An error occurred while calling o47.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 11, in main
File "/usr/lib/spark/python/pyspark/rdd.py", line 809, in collect
port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
File "/usr/lib/spark/python/pyspark/rdd.py", line 2439, in _jrdd
self._jrdd_deserializer, profiler)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2372, in _wrap_function
pickled_command, broadcast_vars, env, includes = _prepare_for_python_RDD(sc, command)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2358, in _prepare_for_python_RDD
pickled_command = ser.dumps(command)
File "/usr/lib/spark/python/pyspark/serializers.py", line 440, in dumps
return cloudpickle.dumps(obj, 2)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 667, in dumps
cp.dump(obj)
File "/usr/lib/spark/python/pyspark/cloudpickle.py", line 120, in dump
raise pickle.PicklingError(msg)
pickle.PicklingError: Could not serialize object: Py4JError: An error occurred while calling o47.__getnewargs__. Trace:
py4j.Py4JException: Method __getnewargs__([]) does not exist
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:318)
at py4j.reflection.ReflectionEngine.getMethod(ReflectionEngine.java:326)
at py4j.Gateway.invoke(Gateway.java:272)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
最佳答案
问题是您正在从任务中调用 Spark 上下文,这是不允许的。
编辑以下评论:
您可以通过两种方式对创建要写入数据库的行的工作进行分区:
# use mapPartitions to get an RDD of rows
def test(partition_iter):
rows = [{'col1': 26, 'col2': 12, 'col2': 153.49353894392, 'col4': 1} for i in partition_iter]
return rows
def main():
rows = SPARK.sparkcontext.parallelize([1, 2, 3, 4]).mapPartitions(test)
df = SPARK.createDataFrame(rows)
df.write.jdbc(url="jdbc:mysql://rds-url/db_name", table="db_name", properties={"driver":"com.mysql.jdbc.Driver","user":"user", "password":"password"}, mode="append")
main()
或者:
# use pymysql in each partition instead of spark sql as per your original attempt
def test(partition_iter):
row = []
row.append({'col1': 26, 'col2': 12, 'col2': 153.49353894392, 'col4': 1})
# TODO: pymysql code goes here:
# mysqldriver = ???
def main():
SPARK.sparkcontext.parallelize([1, 2, 3, 4]).foreachPartition(test)
main()
关于hadoop - 无法使用 spark API 写入 MySQL - pickle.PicklingError : Could not serialize object,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/44321158/
我有这个代码 var myChart = new FusionCharts("../themes/clean/charts/hbullet.swf", "myChartId", "400", "75
既然写入是立即进行的(复制到内核缓冲区并返回),那么使用 io_submit 进行写入有什么好处? 事实上,它 (aio/io_submit) 看起来更糟,因为您必须在堆上分配写入缓冲区并且不能使用基
我正在使用 mootool 的 Request.JSON 从 Twitter 检索推文。收到它后,我将写入目标 div 的 .innerHTML 属性。当我在本地将其作为文件进行测试时,即 file:
最终,我想将 Vertica DB 中的数据抓取到 Spark 中,训练机器学习模型,进行预测,并将这些预测存储到另一个 Vertica DB 中。 当前的问题是确定流程最后部分的瓶颈:将 Spark
我使用 WEKA 库编写了一个 Java 程序, 训练分类算法 使用经过训练的算法对未标记的数据集运行预测 将结果写入 .csv 文件 问题在于它当前写出离散分类结果(即算法猜测一行属于哪个类别)。我
背景 - 我正在考虑使用 clickonce 通过 clickonce(通过网站)部署 WinForms 应用程序。相对简单的应用程序的要素是: - 它是一个可执行文件和一个数据库文件(sqlite)
是否有更好的解决方案来快速初始化 C 数组(在堆上创建)?就像我们使用大括号一样 double** matrix_multiply(const double **l_matrix, const dou
我正在读取 JSON 文件,取出值并进行一些更改。 基本上我向数组添加了一些值。之后我想将其写回到文件中。当我将 JSONArray 写回文件时,会被写入字符串而不是 JSONArray 对象。怎样才
我为两个应用程序使用嵌入式数据库,其中一个是服务器,另一个是客户端。客户端应用程序。可以向服务器端发送获取数据请求以检索数据并显示在表格(或其他)中。问题是这样的:如何将获取的数据保存(写入)到页面文
是否有更好的解决方案来快速初始化 C 数组(在堆上创建)?就像我们使用大括号一样 double** matrix_multiply(const double **l_matrix, const dou
从问题得出问题:找到所有 result = new ArrayList(); for (int i = 2; i >(i%8) & 0x1) == 0) { result.add(i
由于某种原因,它没有写入 CSV。谁能明白为什么它不写吗? def main(): list_of_emails = read_email_csv() #read input file, cr
关闭。 这个问题是 not reproducible or was caused by typos 。它目前不接受答案。 这个问题是由于错别字或无法再重现的问题引起的。虽然类似的问题可能在这里出现,
我目前正在开发一个保存和加载程序,但我无法获得正确的结果。 编写程序: #include #include #define FILENAME "Save" #define COUNT 6 type
import java.io.*; public class Main2 { public static void main(String[] args) throws Exception {
我需要使用预定义位置字符串“Office”从所有日历中检索所有 iOS 事件,然后将结果写入 NSLog 和 UITextView。 到目前为止,这是我的代码: #import "ViewCo
我正在尝试将 BOOL 值写入 PFInstallation 中的列,但会不停地崩溃: - (IBAction)pushSwitch:(id)sender { NSUserDefaults *push
我以前在学校学过一些简单的数据库编程,但现在我正在尝试学习最佳实践,因为我正在编写更复杂的应用程序。写入 MySQL 数据库并不难,但我想知道让分布式应用程序写入 Amazon EC2 上的远程数据库
是否可以写回到ResourceBundle?目前我正在使用 ResourceBundle 来存储信息,在运行时使用以下内容读取信息 while(ResourceBundle.getBundle("bu
关闭。这个问题是not reproducible or was caused by typos .它目前不接受答案。 这个问题是由于错别字或无法再重现的问题引起的。虽然类似的问题可能是on-topi
我是一名优秀的程序员,十分优秀!