gpt4 book ai didi

java - Spark使用s3a以多线程方式写入文件

转载 作者:行者123 更新时间:2023-12-01 19:13:32 24 4
gpt4 key购买 nike

我正在使用本地 Spark 从 s3 读取和写入。为了写回 s3,我使用 java 并发 util,这样我就可以以多线程方式编写。

这是我的实现

ConvertToCsv 该方法具有spark.write 操作

 for ( String Id: listOfId) {

Future<?> future = executor.submit( () -> {

ConvertToCsv( dataFrame, destinationPath, Id);
} );
futures.add( future );

}

我收到此错误!

No such file or directory: s3a://kira-bucket-parquet/collection/82709dd1-8924-481c-9d93-14a9e2e0c524/5e67e9d5-2d8b-4c4b-928a-4736485af3ca/_temporary/0 at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:2269) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:2163) at org.apache.hadoop.fs.s3a.S3AFileSystem.getFileStatus(S3AFileSystem.java:2102) at org.apache.hadoop.fs.s3a.S3AFileSystem.innerListStatus(S3AFileSystem.java:1903) at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$listStatus$9(S3AFileSystem.java:1882) at org.apache.hadoop.fs.s3a.Invoker.once(Invoker.java:109) at org.apache.hadoop.fs.s3a.S3AFileSystem.listStatus(S3AFileSystem.java:1882) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1919) at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1961) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.getAllCommittedTaskPaths(FileOutputCommitter.java:269) at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitJob(FileOutputCommitter.java:309) at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitJob(HadoopMapReduceCommitProtocol.scala:166) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:213) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:154) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala

我遇到的解决方案是配置 s3a 提交者。

如何在本地 Spark 中配置 S3a 提交者?还有其他解决方案吗?

最佳答案

要安全地提交工作(即使是在本地),您可以使用 S3A 提交程序。

虽然它们位于 hadoop-aws JAR 中,但它们是针对 Spark 以及 MapReduce 设计和测试的。

咨询the documentation

关于java - Spark使用s3a以多线程方式写入文件,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/59445820/

24 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com