gpt4 book ai didi

apache-spark - Spark 异常 : Task failed while writing rows

转载 作者:可可西里 更新时间:2023-11-01 15:48:50 25 4
gpt4 key购买 nike

使用spark-streamingKafka消费数据,然后以orc格式写入HDFS

Kafka 中存储的数据是这样的:

hadoop
hive
impala
hive

我的代码:

  def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder.master("local[4]")
.appName("SpeedTester")
.config("spark.driver.memory", "3g")
.getOrCreate()

val ds = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.95.20:9092")
.option("subscribe", "trial")
.option("startingOffsets" , "earliest")
.load()
.selectExpr("CAST(value as string)")
.writeStream
.outputMode("append")
.format("orc")
.option("path", "hdfs://192.168.95.19:8022/user/hive/warehouse/test.db/demo")
.option("checkpointLocation", "/tmp/checkpoint")
.start()
.awaitTermination()
}

代码可以成功将text格式的数据写入HDFS。但是,当我将其更改为 orc 格式时,它返回:

Caused by: org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:285)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:197)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:196)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:381)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://192.168.95.19:8022/user/hive/warehouse/test.db/demo/part-00000-cfd9991f-e503-4140-811b-a00f7da7191e-c000.snappy.orc
at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1270)
at org.apache.hadoop.hdfs.DistributedFileSystem$20.doCall(DistributedFileSystem.java:1262)

出现此问题的原因是什么以及如何解决?任何帮助表示赞赏。


对了,Hive表创建语句:

create table test.demo (demo string)
stored as orc;

最佳答案

您需要创建一个新的配置单元 session ,然后使用它以 ORC 格式存储数据。代码看起来像(未经测试,因为我无权访问任何 spark 集群):

def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder.master("local[4]")
.appName("SpeedTester")
.config("spark.driver.memory", "3g")
.getOrCreate()

// create a new hive context from the spark context
val hiveContext = new org.apache.spark.sql.hive.HiveContext(spark)


// create the data frame and write it to orc
// output will be a directory of orc files
val ds = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "192.168.95.20:9092")
.option("subscribe", "trial")
.option("startingOffsets" , "earliest")
.load()

ds.write.mode(SaveMode.Overwrite)
.format("orc")
.save("hdfs://192.168.95.19:8022/user/hive/warehouse/test.db/demo/")
}

试试这个,让我知道它是否有效!!

关于apache-spark - Spark 异常 : Task failed while writing rows,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/53556449/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com