- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我需要从我的 Databricks 笔记本将 pyspark Dataframe 发送到 Eventhub。问题发生在这部分代码:
ehWriteConf = {
'eventhubs.connectionString' : EVENT_HUB_CONNECTION_STRING
}
def send_to_eventhub(df:DataFrame):
ds = df.select(struct(*[c for c in df.columns]).alias("body"))\
.select("body")\
.write.format("eventhubs")\
.options(**ehWriteConf)\
.save()
我在对数据帧进行一些处理后调用此方法:
# write feature_df into our EventHub
send_to_eventhub(feature_df)
一些类似的问题表明这是一个库版本问题,因此我已经尝试了我找到的几个答案,例如安装以下库的兼容版本:
com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22
但这是我收到的错误消息:
java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<command-37526120346879> in <module>
5 # write feature_df into our EventHub
6
----> 7 send_to_eventhub(feature_df)
8
9 # implement reading data from EventHub through a loop in print statement
<command-2498519353602292> in send_to_eventhub(df)
34 # .format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider")\
35 # .format("org.apache.spark.sql.eventhubs.EventHubsSourceProvider")
---> 36 ds = df.select(struct(*[c for c in df.columns]).alias("body"))\
37 .select("body")\
38 .write.format("eventhubs")\
/databricks/spark/python/pyspark/sql/readwriter.py in save(self, path, format, mode, partitionBy, **options)
736 self.format(format)
737 if path is None:
--> 738 self._jwrite.save()
739 else:
740 self._jwrite.save(path)
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
1302
1303 answer = self.gateway_client.send_command(command)
-> 1304 return_value = get_return_value(
1305 answer, self.gateway_client, self.target_id, self.name)
1306
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
115 def deco(*a, **kw):
116 try:
--> 117 return f(*a, **kw)
118 except py4j.protocol.Py4JJavaError as e:
119 converted = convert_exception(e.java_exception)
/databricks/spark/python/lib/py4j-0.10.9.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
324 value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
325 if answer[1] == REFERENCE_TYPE:
--> 326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
328 format(target_id, ".", name), value)
Py4JJavaError: An error occurred while calling o1187.save.
: java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V
at org.apache.spark.sql.eventhubs.EventHubsWriter$.validateQuery(EventHubsWriter.scala:58)
at org.apache.spark.sql.eventhubs.EventHubsWriter$.write(EventHubsWriter.scala:70)
at org.apache.spark.sql.eventhubs.EventHubsSourceProvider.createRelation(EventHubsSourceProvider.scala:124)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:47)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:80)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:78)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:89)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.$anonfun$applyOrElse$1(QueryExecution.scala:160)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:239)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:386)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:186)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:968)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:141)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:336)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:160)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$$nestedInanonfun$eagerlyExecuteCommands$1$1.applyOrElse(QueryExecution.scala:156)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:575)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:167)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:575)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:268)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:264)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:30)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:551)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$eagerlyExecuteCommands$1(QueryExecution.scala:156)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:324)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:156)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:141)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:132)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:186)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:959)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:427)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:396)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:258)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:380)
at py4j.Gateway.invoke(Gateway.java:295)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:251)
at java.lang.Thread.run(Thread.java:748)
问题之一是不太清楚没有找到什么方法。
我运行笔记本的集群详细信息是:
最佳答案
要写入的数据帧需要具有以下架构:
Column | Type
----------------------------------------------
body (required) | string or binary
partitionId (*optional) | string
partitionKey (*optional) | string
这对我有用。
df.withColumn('body', F.to_json(
F.struct(*df.columns),
options={"ignoreNullFields": False}))\
.select('body')\
.write\
.format("eventhubs")\
.options(**ehconf)\
.save()
关于python - 从 Databricks 笔记本向 Azure Eventhubs 发送 Spark 数据帧时出现错误 (java.lang.NoSuchMethodError),我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/73962665/
问题:将具有两个字符分隔符的数据文件加载到 Azure SQL Db 时,我们遇到以下错误。我们可能做错了什么以及如何解决这个问题? 在 Azure Databricks 中使用 Python 笔记本
我正在使用 ipython notebook 迈出第一步,我成功地将它安装在我的远程服务器上(通过 SSH),并使用以下命令启动它: ipython notebook --ip='*' ---pyla
我想知道对于一个网站来说,最好的数据库设计是什么,让用户(假设有 10k 个用户)给自己留下带有日期的注释。用户只能看到自己的笔记。 用户将看到的示例: 2014年9月28日 -去商店 -带狗去散步
我希望没有人会认为这个问题离题。我即将开始探索在 Jupyter 笔记本中使用 C# 内核。我看到有几种选择,有些似乎过时了。我对探索它们并不真正感兴趣,我只是想要一些能够很好地用于演示的东西。目的是
我已经使用此命令在 R 控制台中安装了 Rcpp,该命令直到现在通常用于安装要在 jupyter 笔记本上运行的软件包: install.packages('Rcpp', '/home/user/an
我已经使用更新了 Tornado sudo pip install --upgrade tornado 检查当前版本显示我已经有一个 4.0 以后的版本 找到命令的输出: pip 显示 Tornado
这个问题在这里已经有了答案: How can I share Jupyter notebooks with non-programmers? [closed] (6 个回答) 5年前关闭。 我安装了一
我有一个 IPython 笔记本,我不小心丢弃了一个巨大的输出 (15 MB),导致笔记本崩溃。现在,当我打开笔记本并尝试删除有问题的单元格时,笔记本又崩溃了——从而阻止了我解决问题并将笔记本恢复到稳
当我使用 ipython 笔记本(Windows 版本)运行任何 %R 代码时,输出在结果之前包含一个 480 x 480 的空白图像。我已经申请了fix #2433让 rmagic 在 Wind
大家好,我想使用新的.NET Jupyter笔记本,因此我准备了一个gitt的存储库,其中包含dockerfile(是正确的)和NuGet.config文件,例如: here the tuto 一切都
如何将本地镜像添加到 IJulia 笔记本?该图像与 IPYNB 文件位于同一本地网络文件夹中。该文件夹可通过符号链接(symbolic link)访问。我尝试过相对文件名和绝对文件名。 ![Ima
docs描述如何创建密码来保护您的 jupyter 笔记本。我希望能够创建并共享一个特定的笔记本,并为该笔记本设置特殊的密码。这可能吗? 最佳答案 不,这是不可能的。该密码保护整个 Jupyter 服
最近我了解到,使用 python 可以生成 IPython 笔记本 automatically .这看起来是个很酷的功能,我想用它来自动生成报告。但是用 julia 而不是 python。那么是否有一
我正在使用 Jupyter 笔记本,并且需要运行另一个 Jupyter 笔记本。这通常使用 %run 很简单,但另一个笔记本的路径是相对的并且包含空格。 这会产生以下错误: %run '..//../
我想在一个循环中运行完整的 Jupyter 笔记本,为笔记本的每次运行传递不同的参数。我可以使用插件传递参数,如下所述:Passing command line arguments to argv i
我真的很感谢这里的一些帮助,基本上我正在学习使用tensorflow,我决定最简单的方法是在VMware和/或Virtualbox上安装ubuntu,然后访问ipython笔记本(anaconda附带
有谁知道在执行单元格之前是否有选项(或建议的 hack)可以让 IPython 笔记本自动保存? 很多时候我一直在做一些事情而没有保存很长一段时间,然后我执行了一个愚蠢的命令,该命令在控制台上打印了如
我正在尝试在另一个文件中运行 .ipynb 文件。我想嵌套这两个的原因是因为在其中一个中,我安装了所有 conda 软件包、github 存储库,并且我不想为我将暂时使用的演示文件重做整个事情。我使用
我有多个相互链接的 Jupyter 笔记本,例如 Notebook1.ipydb 包含指向 Notebook2.ipydb 的链接,其 Markdown [Notebook2](Notebook2.i
我使用 jupyter python 中的 matplotlib 库生成了下图。正如你所看到的,我有两个不同的数据,一个是红色,另一个是绿色。我的问题是,有什么办法可以让我有一个小侧窗来说明每种颜色的
我是一名优秀的程序员,十分优秀!