- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
在 Azure HDInsight 集群上使用 Python,我们使用以下代码将 Spark 数据帧作为 Parquet 文件保存到 Azure Data Lake Storage Gen2:
df.write.parquet('abfs://my_dwh_container@my_storage_account.dfs.core.windows.net/mypath, 'overwrite', compression='snappy')
这通常有效,但是当我们最近升级集群以同时运行更多脚本(大约 10 到 15 个)时,对于一小部分脚本,我们始终会遇到以下异常:
Py4JJavaError: An error occurred while calling o2232.parquet. :java.io.FileNotFoundException: Operation failed: "The specified pathdoes not exist.", 404, PUT,https://my_storage_account.dfs.core.windows.net/mypath/_temporary/0?resource=directory&timeout=90,PathNotFound, "The specified path does not exist."
我认为所有 Spark 作业和任务实际上都成功了,包括保存表的作业和任务,但随后 Python 脚本异常退出。
我们使用的是 Spark 2.4.5.4.1.1.2。使用 Scala 版本 2.11.12、OpenJDK 64 位服务器 VM、1.8.0_265、Hadoop 3.1.2.4.1.1.2
堆栈跟踪:
File "/usr/hdp/current/spark2-client/python/pyspark/sql/readwriter.py", line 843, in parquet
df_to_save.write.parquet(blob_path, mode, compression='snappy')
self._jwrite.parquet(path)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/usr/hdp/current/spark2-client/python/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/usr/hdp/current/spark2-client/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o2232.parquet.
: java.io.FileNotFoundException: Operation failed: "The specified path does not exist.", 404, PUT, https://my_dwh_container@my_storage_account.dfs.core.windows.net/mypath/_temporary/0?resource=directory&timeout=90, PathNotFound, "The specified path does not exist. RequestId:1870ec49-e01f-0101-72f8-f260fe000000 Time:2021-12-17T03:42:35.8434071Z"
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.checkException(AzureBlobFileSystem.java:1178)
at org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.mkdirs(AzureBlobFileSystem.java:477)
at org.apache.hadoop.fs.FileSystem.mkdirs(FileSystem.java:2288)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.setupJob(FileOutputCommitter.java:382)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.setupJob(HadoopMapReduceCommitProtocol.scala:162)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:139)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:282)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:238)
at java.lang.Thread.run(Thread.java:748)
日志:
21/12/17 03:42:02 INFO DAGScheduler [Thread-11]: Job 2 finished: saveAsTable at NativeMethodAccessorImpl.java:0, took 1.120535 s
21/12/17 03:42:02 INFO FileFormatWriter [Thread-11]: Write Job 11fc45a5-d398-4f9a-8350-f928c3722886 committed.
21/12/17 03:42:02 INFO FileFormatWriter [Thread-11]: Finished processing stats for write job 11fc45a5-d398-4f9a-8350-f928c3722886.
(...)
21/12/17 03:42:05 INFO ParquetFileFormat [Thread-11]: Using default output committer for Parquet: org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: File Output Committer Algorithm version is 2
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false, move _temporary folders into Trash: false
21/12/17 03:42:05 INFO SQLHadoopMapReduceCommitProtocol [Thread-11]: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: File Output Committer Algorithm version is 2
21/12/17 03:42:05 INFO FileOutputCommitter [Thread-11]: FileOutputCommitter skip cleanup _temporary folders under output directory:false, ignore cleanup failures: false, move _temporary folders into Trash: false
21/12/17 03:42:05 INFO SQLHadoopMapReduceCommitProtocol [Thread-11]: Using output committer class org.apache.parquet.hadoop.ParquetOutputCommitter
21/12/17 03:42:28 ERROR ApplicationMaster [Driver]: User application exited with status 1
21/12/17 03:42:28 INFO ApplicationMaster [Driver]: Final app status: FAILED, exitCode: 1, (reason: User application exited with status 1)
这个异常还有另一个版本,它确实发生在 Spark 任务中,然后失败,但 Spark 会自动重新启动失败的任务,通常它会成功。在某些情况下,AM 会报告应用程序失败,但我不明白为什么,因为所有作业都成功了。
可能的原因
如 Spark _temporary creation reason 所示我希望在完成所有任务之前不会移动 _temporary 目录。查看堆栈跟踪,它发生在 AzureBlobFileSystem.mkdirs 中,这表明它正在尝试在 _temporary/0
下的某处创建子目录,但找不到 0
目录。我不确定此时 _temporary
目录是否存在。
相关问题
_temporary
本身不存在,但我不明白为什么 mkdirs 不创建它。但我不认为 AzureBlobFileSystem 是开源的?PATH_ALREADY_EXISTS
标志的 checkException,这对我来说没有意义。可以尝试的选项:
最佳答案
ABFS 是一个“真正的”文件系统,因此不需要 S3A 零重命名提交者。确实,它们不会起作用。而且客户端是完全开源的 - 查看 hadoop-azure模块。
ADLS gen2 存储确实存在规模问题,但除非您尝试提交 10,000 个文件,或清理非常深的目录树,否则您不会遇到这些问题。如果您确实收到有关 Elliott 重命名单个文件的错误消息,并且您正在执行这种规模的工作 (a) 与 Microsoft 讨论增加您分配的容量,以及 (b) 选择它 https://github.com/apache/hadoop/pull/2971
不是这样的。我猜想实际上您有多个作业写入同一个输出路径,一个正在清理而另一个正在设置。特别是 - 他们的工作 ID 似乎都是“0”。由于使用了相同的作业 ID,仅当任务设置和任务清理混淆时,当一个作业提交时,它可能包含来自已成功提交的所有任务尝试的作业 2 的输出。
我相信这是 spark 独立部署的一个已知问题,但我找不到相关的 JIRA。 SPARK-24552很接近,但应该已在您的版本中修复。 SPARK-33402 在同一秒内启动的作业具有重复的 MapReduce JobID。这是关于作业 ID 仅来自系统当前时间,而不是 0。但是:您可以尝试升级您的 spark 版本以查看它是否消失。
我的建议
关于python - 保存 Parquet 文件时 _temporary/0 目录上的 FileNotFoundException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/70393987/
Spark 将正在进行的处理数据存储在 _temporary 文件夹中。作业完成后,数据将移动到其最终目的地。但是,当有数万个分区时,将文件从一个地方移动到另一个地方需要相当长的时间。问题:如何加快这
为什么spark在将结果保存到文件系统时,将结果文件上传到_temporary目录,然后将它们移动到输出文件夹而不是直接上传到输出文件夹? 最佳答案 在使用文件系统时,两阶段过程是确保最终结果一致性的
运行后 hdfs dfs -rm -r -skipTrash hdfs://valid/output/path hdfs dfs -ls hdfs://valid/output/path 并验证此输出
是否可以更改 _temporary spark在写入之前保存其临时文件的目录? 特别是,由于我正在编写表的单个分区,因此我希望临时文件夹位于分区文件夹中。 有可能吗? 最佳答案 由于它的实现,没有办法
在 Azure HDInsight 集群上使用 Python,我们使用以下代码将 Spark 数据帧作为 Parquet 文件保存到 Azure Data Lake Storage Gen2: df.
我需要将数据帧上传到 S3 存储桶,但我对存储桶没有删除权限。有什么办法可以避免在 S3 上创建这个 _temporary 目录?也许在 spark 中以任何方式使用本地 FS 作为 _tempora
我正在使用 pyspark 从 Amazon S3 上的 Parquet 文件中读取数据帧,例如 dataS3 = sql.read.parquet("s3a://" + s3_bucket_in)
我使用pyspark 并使用 MLUtils saveaslibsvm在标记点上保存 RDD 它可以工作,但会将该文件保留在/_temporary/下所有工作节点中的多个文件中。 没有抛出错误,我想将
我是一名优秀的程序员,十分优秀!