gpt4 book ai didi

scala - Spark 2.2.0独立模式将数据帧写入本地单节点Kafka时出错

转载 作者:行者123 更新时间:2023-12-03 16:43:28 25 4
gpt4 key购买 nike

数据源来自Databricks Notebook演示:Five Spark SQL Helper Utility Functions to Extract and Explore Complex Data Types!

但是,当我在自己的笔记本电脑上尝试这些代码时,总是会遇到错误。

首先,将JSON数据加载为DataFrame

res2: org.apache.spark.sql.DataFrame = [battery_level: string, c02_level: string]

scala> res2.show

+-------------+---------+
|battery_level|c02_level|
+-------------+---------+
| 7| 886|
| 5| 1378|
| 8| 917|
| 8| 1504|
| 8| 831|
| 9| 1304|
| 8| 1574|
| 9| 1208|
+-------------+---------+

其次,将 write数据发送到Kafka:
res2.write 
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "test")
.save()

所有这些都遵循上面的笔记本演示和官方 steps

但是错误显示:
scala> res2.write 
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("topic", "iot-devices")
.save()

org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72)
at org.apache.spark.sql.kafka010.KafkaWriter$$anonfun$6.apply(KafkaWriter.scala:72)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.kafka010.KafkaWriter$.validateQuery(KafkaWriter.scala:71)
at org.apache.spark.sql.kafka010.KafkaWriter$.write(KafkaWriter.scala:87)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createRelation(KafkaSourceProvider.scala:165)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
... 52 elided

我假设这可能是Kafka问题,然后我测试了Kafka的DataFrame read以确保连通性:
scala> val kaDF = spark.read
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", "iot-devices")
.load()
kaDF: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]

scala> kaDF.show

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key| value| topic|partition|offset| timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null| [73 73 73 73 73]|iot-devices| 0| 0|2017-09-27 11:11:...| 0|
|null|[64 69 63 6B 20 3...|iot-devices| 0| 1|2017-09-27 11:29:...| 0|
|null| [78 69 78 69]|iot-devices| 0| 2|2017-09-27 11:29:...| 0|
|null|[31 20 32 20 33 2...|iot-devices| 0| 3|2017-09-27 11:30:...| 0|
+----+--------------------+-----------+---------+------+--------------------+-------------+

因此,结果表明,从Kafka bootstrap.servers localhost:9092读取主题“iot-devices”中的数据确实可行。

我在网上搜索了很多内容,但仍然无法解决?

任何具有Spark SQL经验的人都可以告诉我命令中的错误吗?

谢谢!

最佳答案

该错误消息清楚地显示了问题的根源:

org.apache.spark.sql.AnalysisException: Required attribute 'value' not found;



将被写入 has to have at least Dataset columnvalue(以及可选的 keytopic)和 res2仅具有 battery_levelc02_level

您可以例如:
import org.apache.spark.sql.functions._

res2.select(to_json(struct($"battery_level", "c02_level")).alias("value"))
.writeStream
...

关于scala - Spark 2.2.0独立模式将数据帧写入本地单节点Kafka时出错,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/46454014/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com