gpt4 book ai didi

java - Flink ParquetSinkWriter FileAlreadyExistsException

转载 作者:行者123 更新时间:2023-12-01 18:33:19 26 4
gpt4 key购买 nike

我尝试使用 BucketingSink 和自定义 ParquetSinkWriter 在 HDFS 上使用 Apache Flink 写入 parquet 文件。

这里是代码,上面的错误表明何时启用检查点(在 BucketingSink 类中调用 snapshotState())时,下面的刷新方法不能安静工作。即使 writer 也通过“writer.close();”关闭但仍然收到“writer = createWriter();”的错误。有什么想法吗?谢谢

出现这样的错误:

org.apache.hadoop.fs.FileAlreadyExistsException: /user/hive/flink_parquet_fils_with_checkingpoint/year=20/month=2/day=1/hour=17/_part-4-9.in-progress for client 192.168.56.202 already exists at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInternal(FSNamesystem.java:3003) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.startFileInt(FSNamesystem.java:2890)

.... . at flink.untils.ParquetSinkWriter.flush(ParquetSinkWriterForecast.java:81) at org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.snapshotState(BucketingSink.java:749)

import org.apache.flink.util.Preconditions;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;

import java.io.IOException;

/**
* Parquet writer.
*
* @param <T>
*/
public class ParquetSinkWriter<T extends GenericRecord> implements Writer<T> {

private static final long serialVersionUID = -975302556515811398L;

private final CompressionCodecName compressionCodecName = CompressionCodecName.SNAPPY;
private final int pageSize = 64 * 1024;

private final String schemaRepresentation;

private transient Schema schema;
private transient ParquetWriter<GenericRecord> writer;
private transient Path path;

private int position;

public ParquetSinkWriter(String schemaRepresentation) {
this.schemaRepresentation = Preconditions.checkNotNull(schemaRepresentation);
}

@Override
public void open(FileSystem fs, Path path) throws IOException {
this.position = 0;
this.path = path;

if (writer != null) {
writer.close();
}

writer = createWriter();
}

@Override
public long flush() throws IOException {
Preconditions.checkNotNull(writer);
position += writer.getDataSize();
writer.close();
writer = createWriter();

return position;
}

@Override
public long getPos() throws IOException {
Preconditions.checkNotNull(writer);
return position + writer.getDataSize();
}

@Override
public void close() throws IOException {
if (writer != null) {
writer.close();
writer = null;
}
}

@Override
public void write(T element) throws IOException {
Preconditions.checkNotNull(writer);
writer.write(element);
}

@Override
public Writer<T> duplicate() {
return new ParquetSinkWriter<>(schemaRepresentation);
}

private ParquetWriter<GenericRecord> createWriter() throws IOException {
if (schema == null) {
schema = new Schema.Parser().parse(schemaRepresentation);
}

return AvroParquetWriter.<GenericRecord>builder(path)
.withSchema(schema)
.withDataModel(new GenericData())
.withCompressionCodec(compressionCodecName)
.withPageSize(pageSize)
.build();
}
}


最佳答案

您尝试创建的文件似乎当前存在。这是因为您使用的是默认写入模式CREATE,当文件存在时该模式会失败。您可以尝试做的就是更改您的代码以使用OVERWRITE模式。您可以更改 createWriter() 方法以返回如下所示的内容:

return AvroParquetWriter.<GenericRecord>builder(path)
.withSchema(schema)
.withDataModel(new GenericData())
.withCompressionCodec(compressionCodecName)
.withPageSize(pageSize)
.withWriteMode(ParquetFileWriter.Mode.OVERWRITE)
.build();

关于java - Flink ParquetSinkWriter FileAlreadyExistsException,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/60120100/

26 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com