gpt4 book ai didi

gzip - Flink 如何容错将数据作为 gzip 压缩下沉到 hdfs?

转载 作者:行者123 更新时间:2023-12-04 14:19:29 27 4
gpt4 key购买 nike

我们想通过Flink的BucketingSink或者StreamingFileSink将压缩后的数据写入HDFS。我已经编写了自己的 Writer,如果没有发生故障,它可以正常工作。然而,当它遇到故障并从检查点重新启动时,它会生成有效长度的文件(hadoop < 2.7)或截断文件。不幸的是,gzip 是二进制文件,在文件末尾有尾部。因此,简单截断在我的情况下不起作用。为压缩 hdfs 接收器启用精确一次语义的任何想法?

这是我编写器的代码:

public class HdfsCompressStringWriter extends StreamWriterBaseV2<JSONObject> {

private static final long serialVersionUID = 2L;

/**
* The {@code CompressFSDataOutputStream} for the current part file.
*/
private transient GZIPOutputStream compressionOutputStream;

public HdfsCompressStringWriter() {}

@Override
public void open(FileSystem fs, Path path) throws IOException {
super.open(fs, path);
this.setSyncOnFlush(true);
compressionOutputStream = new GZIPOutputStream(this.getStream(), true);
}

public void close() throws IOException {
if (compressionOutputStream != null) {
compressionOutputStream.close();
compressionOutputStream = null;
}
resetStream();
}

@Override
public void write(JSONObject element) throws IOException {
if (element == null || !element.containsKey("body")) {
return;
}
String content = element.getString("body") + "\n";
compressionOutputStream.write(content.getBytes());
compressionOutputStream.flush();
}

@Override
public Writer<JSONObject> duplicate() {
return new HdfsCompressStringWriter();
}

最佳答案

我建议为 StreamingFileSink 实现一个 BulkWriter,它通过 GZIPOutputStream 压缩元素。代码可能如下所示:

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
env.enableCheckpointing(1000);

final DataStream<Integer> input = env.addSource(new InfinitySource());

final StreamingFileSink<Integer> streamingFileSink = StreamingFileSink.<Integer>forBulkFormat(new Path("output"), new GzipBulkWriterFactory<>()).build();
input.addSink(streamingFileSink);

env.execute();
}

private static class GzipBulkWriterFactory<T> implements BulkWriter.Factory<T> {
@Override
public BulkWriter<T> create(FSDataOutputStream fsDataOutputStream) throws IOException {
final GZIPOutputStream gzipOutputStream = new GZIPOutputStream(fsDataOutputStream, true);
return new GzipBulkWriter<>(new ObjectOutputStream(gzipOutputStream), gzipOutputStream);
}
}

private static class GzipBulkWriter<T> implements BulkWriter<T> {

private final GZIPOutputStream gzipOutputStream;
private final ObjectOutputStream objectOutputStream;

public GzipBulkWriter(ObjectOutputStream objectOutputStream, GZIPOutputStream gzipOutputStream) {
this.gzipOutputStream = gzipOutputStream;
this.objectOutputStream = objectOutputStream;
}

@Override
public void addElement(T t) throws IOException {
objectOutputStream.writeObject(t);
}

@Override
public void flush() throws IOException {
objectOutputStream.flush();
}

@Override
public void finish() throws IOException {
objectOutputStream.flush();
gzipOutputStream.finish();
}
}

关于gzip - Flink 如何容错将数据作为 gzip 压缩下沉到 hdfs?,我们在Stack Overflow上找到一个类似的问题: https://stackoverflow.com/questions/56474706/

27 4 0
Copyright 2021 - 2024 cfsdn All Rights Reserved 蜀ICP备2022000587号
广告合作:1813099741@qq.com 6ren.com