gpt4 book ai didi

python - 保存 Parquet 文件时 _temporary/0 目录上的 FileNotFoundException

转载 作者:行者123 更新时间:2023-12-05 03:33:01 25 4
gpt4 key购买 nike

在 Azure HDInsight 集群上使用 Python,我们使用以下代码将 Spark 数据帧作为 Parquet 文件保存到 Azure Data Lake Storage Gen2:

df.write.parquet('abfs://my_dwh_container@my_storage_account.dfs.core.windows.net/mypath, 'overwrite', compression='snappy')

这通常有效,但是当我们最近升级集群以同时运行更多脚本(大约 10 到 15 个)时,对于一小部分脚本,我们始终会遇到以下异常:

Py4JJavaError: An error occurred while calling o2232.parquet. :java.io.FileNotFoundException: Operation failed: "The specified pathdoes not exist.", 404, PUT,https://my_storage_account.dfs.core.windows.net/mypath/_temporary/0?resource=directory&timeout=90,PathNotFound, "The specified path does not exist."

我认为所有 Spark 作业和任务实际上都成功了,包括保存表的作业和任务,但随后 Python 脚本异常退出。


背景资料

我们使用的是 Spark 2.4.5.4.1.1.2。使用 Scala 版本 2.11.12、OpenJDK 64 位服务器 VM、1.8.0_265、Hadoop 3.1.2.4.1.1.2

堆栈跟踪:

  File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 843, in parquet
df_to_save.write.parquet(blob_path, mode, compression='snappy')
self._jwrite.parquet(path)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2232.parquet.
: java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, PUT, https://my_dwh_container@my_storage_account.dfs.core.windows.net/mypath/_temporary/0?resource=directory&timeout=90, PathNotFound, "The specified path does not exist. RequestId:1870ec49-e01f-0101-72f8-f260fe000000 Time:2021-12-17T03:42:35.8434071Z"
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1178)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.mkdirs(AzureBlobFileSystem.java:477)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2288)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:382)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:162)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)

日志:

21/12/17 03:42:02 INFO DAGScheduler [Thread-11]: Job 2 finished: saveAsTable at NativeMethodAccessorImpl.java:0, took 1.120535 s
21/12/17 03:42:02 INFO FileFormatWriter [Thread-11]: Write Job 11fc45a5-d398-4f9a-8350-f928c3722886 committed.
21/12/17 03:42:02 INFO FileFormatWriter [Thread-11]: Finished processing stats for write job 11fc45a5-d398-4f9a-8350-f928c3722886.
(...)
21/12/17 03:42:05 INFO ParquetFileFormat [Thread-11]: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: File Output Committer Algorithm version is 2
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false, move _temporary folders into Trash: false
21/12/17 03:42:05 INFO SQLHadoopMapReduceCommitProtocol [Thread-11]: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: File Output Committer Algorithm version is 2
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false, move _temporary folders into Trash: false
21/12/17 03:42:05 INFO SQLHadoopMapReduceCommitProtocol [Thread-11]: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:28 ERROR ApplicationMaster [Driver]: User application exited with status 1
21/12/17 03:42:28 INFO ApplicationMaster [Driver]: Final app status: FAILED, exitCode: 1, (reason: User application exited with status 1)

这个异常还有另一个版本,它确实发生在 Spark 任务中,然后失败,但 Spark 会自动重新启动失败的任务,通常它会成功。在某些情况下,AM 会报告应用程序失败,但我不明白为什么,因为所有作业都成功了。

可能的原因

Spark _temporary creation reason 所示我希望在完成所有任务之前不会移动 _temporary 目录。查看堆栈跟踪,它发生在 AzureBlobFileSystem.mkdirs 中,这表明它正在尝试在 _temporary/0 下的某处创建子目录,但找不到 0 目录。我不确定此时 _temporary 目录是否存在。

相关问题

可以尝试的选项:

最佳答案

ABFS 是一个“真正的”文件系统,因此不需要 S3A 零重命名提交者。确实,它们不会起作用。而且客户端是完全开源的 - 查看 hadoop-azure模块。

ADLS gen2 存储确实存在规模问题,但除非您尝试提交 10,000 个文件,或清理非常深的目录树,否则您不会遇到这些问题。如果您确实收到有关 Elliott 重命名单个文件的错误消息,并且您正在执行这种规模的工作 (a) 与 Microsoft 讨论增加您分配的容量,以及 (b) 选择它 https://github.com/apache/hadoop/pull/2971

不是这样的。我猜想实际上您有多个作业写入同一个输出路径,一个正在清理而另一个正在设置。特别是 - 他们的工作 ID 似乎都是“0”。由于使用了相同的作业 ID,仅当任务设置和任务清理混淆时,当一个作业提交时,它可能包含来自已成功提交的所有任务尝试的作业 2 的输出。

我相信这是 spark 独立部署的一个已知问题,但我找不到相关的 JIRA。 SPARK-24552很接近,但应该已在您的版本中修复。 SPARK-33402 在同一秒内启动的作业具有重复的 MapReduce JobID。这是关于作业 ID 仅来自系统当前时间,而不是 0。但是:您可以尝试升级您的 spark 版本以查看它是否消失。

我的建议

  1. 确保您的作业不会同时写入同一个表。事情会变得一团糟。
  2. 获取您满意的最新版本 spark

关于python - 保存 Parquet 文件时 _temporary/0 目录上的 FileNotFoundException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70393987/

25 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com