gpt4 book ai didi

apache-spark - FileNotFoundException : Spark save fails. 无法从数据集 [T] avro 清除缓存

转载 作者:行者123 更新时间:2023-12-02 02:52:20 27 4
gpt4 key购买 nike

第二次在 avro 中保存数据帧时出现以下错误。如果我在保存后删除 sub_folder/part-00000-XXX-c000.avro,然后尝试保存相同的数据集,我会得到以下信息:

FileNotFoundException: File /.../main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist. It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
  • 如果我不仅从 sub_folder 中删除,而且从 main_folder 中删除,那么问题就不会发生,但我负担不起。
  • 尝试以任何方式保存数据集时实际上并没有发生问题其他格式。
  • 保存空数据集不会导致错误。

该示例表明需要刷新表格,但作为 sparkSession.catalog.listTables().show() 的输出,没有要刷新的表格。

+----+--------+-----------+---------+-----------+
|name|database|description|tableType|isTemporary|
+----+--------+-----------+---------+-----------+
+----+--------+-----------+---------+-----------+

之前保存的数据框看起来像这样。应用程序应该更新它:

+--------------------+--------------------+
| Col1 | Col2 |
+--------------------+--------------------+
|[123456, , ABC, [...|[[v1CK, RAWNAME1_,..|
|[123456, , ABC, [...|[[BG8M, RAWNAME2_...|
+--------------------+--------------------+

对我来说,这是一个明显的缓存问题。但是,所有清除缓存的尝试都失败了:

 dataset.write
.format("avro")
.option("path", path)
.mode(SaveMode.Overwrite) // Any save mode gives the same error
.save()

// Moving this either before or after saving doesnt help.
sparkSession.catalog.clearCache()

// This will not un-persist any cached data that is built upon this Dataset.
dataset.cache().unpersist()
dataset.unpersist()

这就是我读取数据集的方式:

private def doReadFromPath[T <: SpecificRecord with Product with Serializable: TypeTag: ClassTag](path: String): Dataset[T] = {

val df = sparkSession.read
.format("avro")
.load(path)
.select("*")

df.as[T]
}

最后的堆栈跟踪是这个。非常感谢您的帮助!:

ERROR [task-result-getter-3] (Logging.scala:70) - Task 0 in stage 9.0 failed 1 times; aborting job
ERROR [main] (Logging.scala:91) - Aborting job 150de02a-ac6a-4d42-824d-5db44a98c19a.
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 11, localhost, executor driver): org.apache.spark.SparkException: Task failed while writing rows.
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:254)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:168)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.FileNotFoundException: File file:/DATA/XXX/main_folder/sub_folder/part-00000-3e7064c0-4a82-424c-80ca-98ce75766972-c000.avro does not exist
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.org$apache$spark$sql$execution$datasources$FileScanRDD$$anon$$readCurrentFile(FileScanRDD.scala:127)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:177)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:101)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$11$$anon$1.hasNext(WholeStageCodegenExec.scala:619)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:241)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:239)
at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:245)
... 10 more

最佳答案

*Reading from the same location and writing in to same location will give this issue. it was also discussed in this forum. along with my answer there *

错误中的以下消息将误导。但实际问题是在同一位置读取/写入。

You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL

我给出了另一个例子而不是你的例子(在你的案例中使用了 parquet avro)。

我有 2 个选项供您选择。

选项 1(cacheshow 的工作方式如下...):

import org.apache.spark.sql.functions._
val df = Seq((1, 10), (2, 20), (3, 30)).toDS.toDF("sex", "date")

df.show(false)

df.repartition(1).write.format("parquet").mode("overwrite").save(".../temp") // save it
val df1 = spark.read.format("parquet").load(".../temp") // read back again

val df2 = df1.withColumn("cleanup" , lit("Rod want to cleanup")) // like you said you want to clean it.

//BELOW 2 ARE IMPORTANT STEPS LIKE `cache` and `show` forcing a light action show(1) with out which FileNotFoundException will come.

df2.cache // cache to avoid FileNotFoundException
df2.show(2, false) // light action to avoid FileNotFoundException
// or println(df2.count) // action

df2.repartition(1).write.format("parquet").mode("overwrite").save(".../temp")
println("Rod saved in same directory where he read it from final records he saved after clean up are ")
df2.show(false)

选项 2:

1) save the DataFrame with a different avro folder.

2) Delete the old avro folder.

3) Finally rename this newly created avro folder to the old name, will work.

关于apache-spark - FileNotFoundException : Spark save fails. 无法从数据集 [T] avro 清除缓存,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/61725822/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com