gpt4 book ai didi

apache-spark - Spark流:文本数据源仅支持一列

转载 作者:行者123 更新时间:2023-12-02 22:02:36 25 4
gpt4 key购买 nike

我正在使用Kafka数据,然后将数据流传输到HDFS

存储在Kafka主题trial中的数据类似于:

hadoop
hive
hive
kafka
hive

但是,当我提交代码时,它将返回:

线程“主”中的异常
org.apache.spark.sql.streaming.StreamingQueryException: Text data source supports only a single column, and you have 7 columns.;
=== Streaming Query ===
Identifier: [id = 2f3c7433-f511-49e6-bdcf-4275b1f1229a, runId = 9c0f7a35-118a-469c-990f-af00f55d95fb]
Current Committed Offsets: {KafkaSource[Subscribe[trial]]: {"trial":{"2":13,"1":13,"3":12,"0":13}}}
Current Available Offsets: {KafkaSource[Subscribe[trial]]: {"trial":{"2":13,"1":13,"3":12,"0":14}}}

我的问题是:如上所述,存储在 Kafka中的数据仅包含ONE列,为什么程序说存在 7 columns

任何帮助表示赞赏。

我的 spark-streaming代码:
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder.master("local[4]")
.appName("SpeedTester")
.config("spark.driver.memory", "3g")
.getOrCreate()

val ds = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.95.20:9092")
.option("subscribe", "trial")
.option("startingOffsets" , "earliest")
.load()
.writeStream
.format("text")
.option("path", "hdfs://192.168.95.21:8022/tmp/streaming/fixed")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
.awaitTermination()
}

最佳答案

Structured Streaming + Kafka Integration Guide中对此进行了解释:

Each row in the source has the following schema:

Column Type

key binary

value binary

topic string

partition int

offset long

timestamp long

timestampType int



恰好给出了七列。如果只想写入有效负载(值),请选择它并转换为字符串:

spark.readStream
...
.load()
.selectExpr("CAST(value as string)")
.writeStream
...
.awaitTermination()

关于apache-spark - Spark流:文本数据源仅支持一列,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53532968/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com