- html - 出于某种原因,IE8 对我的 Sass 文件中继承的 html5 CSS 不友好?
- JMeter 在响应断言中使用 span 标签的问题
- html - 在 :hover and :active? 上具有不同效果的 CSS 动画
- html - 相对于居中的 html 内容固定的 CSS 重复背景?
我尝试使用 BucketingSink 和自定义 ParquetSinkWriter 在 HDFS 上使用 Apache Flink 写入 parquet 文件。
这里是代码,上面的错误表明何时启用检查点(在 BucketingSink 类中调用 snapshotState())时,下面的刷新方法不能安静工作。即使 writer 也通过“writer.close();”关闭但仍然收到“writer = createWriter();”的错误。有什么想法吗?谢谢
出现这样的错误:
org.apache.hadoop.fs.FileAlreadyExistsException: /user/hive/flink_parquet_fils_with_checkingpoint/year=20/month=2/day=1/hour=17/_part-4-9.in-progress for client 192.168.56.202 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:3003) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2890)
.... . at flink.untils.ParquetSinkWriter.flush(ParquetSinkWriterForecast.java:81) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.snapshotState(BucketingSink.java:749)
import org.apache.flink.util.Preconditions;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import java.io.IOException;
/**
* Parquet writer.
*
* @param <T>
*/
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> {
private static final long serialVersionUID = -975302556515811398L;
private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;
private final String schemaRepresentation;
private transient Schema schema;
private transient ParquetWriter<GenericRecord> writer;
private transient Path path;
private int position;
public ParquetSinkWriter(String schemaRepresentation) {
this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation);
}
@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;
if (writer != null) {
writer.close();
}
writer = createWriter();
}
@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();
return position;
}
@Override
public long getPos() throws IOException {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}
@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}
@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}
@Override
public Writer<T> duplicate() {
return new ParquetSinkWriter<>(schemaRepresentation);
}
private ParquetWriter<GenericRecord> createWriter() throws IOException {
if (schema == null) {
schema = new Schema.Parser().parse(schemaRepresentation);
}
return AvroParquetWriter.<GenericRecord>builder(path)
.withSchema(schema)
.withDataModel(new GenericData())
.withCompressionCodec(compressionCodecName)
.withPageSize(pageSize)
.build();
}
}
最佳答案
您尝试创建的文件似乎当前存在。这是因为您使用的是默认写入模式CREATE
,当文件存在时该模式会失败。您可以尝试做的就是更改您的代码以使用OVERWRITE
模式。您可以更改 createWriter()
方法以返回如下所示的内容:
return AvroParquetWriter.<GenericRecord>builder(path)
.withSchema(schema)
.withDataModel(new GenericData())
.withCompressionCodec(compressionCodecName)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build();
关于java - Flink ParquetSinkWriter FileAlreadyExistsException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60120100/
我在 Ubuntu 14.04 上使用 Hadoop 2.5.1 现在我正在尝试使用 MultipleOutputs 类。这些作业可以在 8 行输入等小数据上正常运行。但是当我尝试用 1000 多行运
我尝试使用 BucketingSink 和自定义 ParquetSinkWriter 在 HDFS 上使用 Apache Flink 写入 parquet 文件。 这里是代码,上面的错误表明何时启用检
这段代码: File tmpFile = File.createTempFile(PREFIX_TMP, null, new File(reportPath)); logger.deb
我正在尝试在重新分区后将数据帧写入 s3 位置。但是,每当写入阶段失败并且 Spark 重试该阶段时,它就会抛出 FileAlreadyExistsException。 当我重新提交作业时,如果 sp
我在使用 Java 7 的 Files 类时遇到了一个看似奇怪的问题。我想在开始编写之前确保我的目录和文件存在以避免 FileNotFoundException,并且根据 Javadocs , cre
在我的代码中有一个用另一个文件替换一个文件的循环。 这是通过以下方式完成的: java.nio.file.Files.move(Path source, Path target, CopyOption
这个程序应该完成 MapReduce 工作。第一个作业的输出必须作为第二个作业的输入。 当我运行它时,出现两个错误: 线程“main”中的异常 org.apache.hadoop.mapred.Fil
这个问题在这里已经有了答案: How to overwrite the output directory in spark (9 个回答) 关闭 6 年前。 我正在运行这个命令行: hadoop f
我正在尝试以 JSON 格式将数据帧写入 s3 位置。但是,每当执行程序任务失败并且 Spark 重试该阶段时,它就会抛出 FileAlreadyExistsException。 A similar
我们正在使用 aws-java-sdk-1.7.4.jar hadoop-aws-2.7.5.jar 运行 Apache Spark 作业,以将 parquet 文件写入 S3 存储桶。 我们在 s3
我正在尝试将文件从InputStream复制到本地目录中。我创建了一个名为 test 的本地目录,它位于我的包根目录中。 public void copyFileFromInputStream(Inp
This question问了类似的问题。但是,就我而言,在调用 Files.createDirectories() 之前或之后该目录不存在。这种情况发生在 Oracle JDK 10.0.2 上。
即使成功运行了几十次 spark 程序,在最新的 sbt 包之后,最新的运行在启动 SparkContext 时出现 FileAlreadyExistsException: Note: I had r
我正在运行 dataFrame.rdd.saveAsTextFile("/home/hadoop/test") 试图将数据帧写入磁盘。这执行没有错误,但未创建文件夹。此外,当我再次运行相同的命令时(在
我使用以下代码通过 pyspark Insertinto 函数将数据帧写入配置单元分区表。 spark.conf.set("spark.sql.sources.partitionOverwriteMo
我在 reducer 中使用 MultipleOutputs。多重输出会将文件写入名为 NewIdentities 的文件夹。代码如下所示: private MultipleOutputs mos;
我正在写代码我正在用java nio api创建一个目录我的代码段是 Path target = Paths.get(""+folder_path+xx[0]); Set perms =
我试图在给定 here 的 Hadoop 中运行示例程序 当我尝试运行它时,我得到一个 org.apache.hadoop.mapred.FileAlreadyExistsException emil
我正在创建一个回滚功能,这是我想要实现的: 在与 data 文件夹 相同的位置创建一个 tmp 文件夹; 在执行任何操作之前,我将所有内容从data 文件夹 复制到tmp 文件夹(少量数据)。 回滚时
我正在尝试在 Amazon EMR 中运行 WordCount 程序,但我收到错误消息: Exception in thread "main" org.apache.hadoop.mapred.Fil
我是一名优秀的程序员,十分优秀!