gpt4 book ai didi

apache-spark - Spark SQL SaveMode.Overwrite,获取java.io.FileNotFoundException并需要“REFRESH TABLE tableName”

转载 作者:行者123 更新时间:2023-12-04 00:51:41 33 4
gpt4 key购买 nike

对于Spark sql,我们应该如何通过覆盖保存模式从HDFS中的一个文件夹中获取数据,进行一些修改,并将更新的数据保存到HDFS中的同一文件夹中,而不会出现FileNotFoundException?

import org.apache.spark.sql.{SparkSession,SaveMode}
import org.apache.spark.SparkConf

val sparkConf: SparkConf = new SparkConf()
val sparkSession = SparkSession.builder.config(sparkConf).getOrCreate()
val df = sparkSession.read.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20")
val newDF = df.select("a","b","c")

newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20") // doesn't work
newDF.write.mode(SaveMode.Overwrite)
.parquet("hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-21") // works


当我们从hdfs目录“ d = 2017-03-20”读取数据并将更新的数据保存(SaveMode.Overwrite)到相同的hdfs目录“ d = 2017-03-20”时,发生FileNotFoundException

Caused by: org.apache.spark.SparkException: Task failed while writing rows
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.FileNotFoundException: File does not exist: hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20/part-05020-35ea100f-829e-43d9-9003061-1788904de770.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:157)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:102)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.scan_nextBatch$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$SingleDirectoryWriteTask.execute(FileFormatWriter.scala:243)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1341)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
... 8 more


以下尝试仍然会出现相同的错误,如何使用Spark sql解决此问题?谢谢!

val hdfsDirPath = "hdfs://xxx.xxx.xxx.xxx:xx/test/d=2017-03-20"

val df= sparkSession.read.parquet(hdfsDirPath)

val newdf = df
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)


要么

val df= sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")

sparkSession.sql("TRUNCATE TABLE orgtable")
sparkSession.sql("INSERT INTO orgtable SELECT * FROM tmptable")

val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)


要么

val df= sparkSession.read.parquet(hdfsDirPath)
df.createOrReplaceTempView("orgtable")
sparkSession.sql("SELECT * from orgtable").createOrReplaceTempView("tmptable")

sparkSession.sql("REFRESH TABLE orgtable")
sparkSession.sql("ALTER VIEW tmptable RENAME TO orgtable")

val newdf = sparkSession.sql("SELECT * FROM orgtable")
newdf.write.mode(SaveMode.Overwrite).parquet(hdfsDirPath)

最佳答案

我解决了这个问题,首先将Dataframe写入临时目录,然后删除读取的源,并将临时目录重命名为源名称。质量检查

关于apache-spark - Spark SQL SaveMode.Overwrite,获取java.io.FileNotFoundException并需要“REFRESH TABLE tableName”,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/42920748/

33 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com